Skip to content

Conversation

@WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Sep 4, 2018

What changes were proposed in this pull request?

Implement an image schema datasource.

This image datasource support:

  • partition discovery (loading partitioned images)
  • dropImageFailures (the same behavior with ImageSchema.readImage)
  • path wildcard matching (the same behavior with ImageSchema.readImage)
  • loading recursively from directory (different from ImageSchema.readImage, but use such path: /path/to/dir/**)

This datasource NOT support:

  • specify numPartitions (it will be determined by datasource automatically)
  • sampling (you can use df.sample later but the sampling operator won't be pushdown to datasource)

How was this patch tested?

Unit tests.

Benchmark

I benchmark and compare the cost time between old ImageSchema.read API and my image datasource.

cluster: 4 nodes, each with 64GB memory, 8 cores CPU
test dataset: Flickr8k_Dataset (about 8091 images)

time cost:

  • My image datasource time (automatically generate 258 partitions): 38.04s
  • ImageSchema.read time (set 16 partitions): 68.4s
  • ImageSchema.read time (set 258 partitions): 90.6s

time cost when increase image number by double (clone Flickr8k_Dataset and loads double number images):

  • My image datasource time (automatically generate 515 partitions): 95.4s
  • ImageSchema.read (set 32 partitions): 109s
  • ImageSchema.read (set 515 partitions): 105s

So we can see that my image datasource implementation (this PR) bring some performance improvement compared against oldImageSchema.read API.

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95667 has finished for PR 22328 at commit 5b5aee6.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -1 +1,2 @@
org.apache.spark.ml.source.libsvm.LibSVMFileFormat
org.apache.spark.ml.source.image.ImageFileFormat
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: newline at the end - this might be caught by maven's checkstyle (#21801)

* @note This class is public for documentation purpose. Please don't use this class directly.
* Rather, use the data source API as illustrated above.
*/
class ImageDataSource private() {} No newline at end of file
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline at the end - technically this should be caught by scalastyle.

import org.apache.spark.util.SerializableConfiguration


private[image] class ImageFileFormatOptions(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we make another file for this for consistency?

"org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"org.apache.spark.ml.source.image.ImageFileFormat.DefaultSource" -> image,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is new datasource, I think we wouldn't probably need this in backword compatibility map.

job: Job, options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
throw new UnsupportedOperationException(
s"prepareWrite is not supported for image data source")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: s can be removed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message is user-facing and users do not know prepareWrite. So just say "Write is not supported"

@HyukjinKwon
Copy link
Member

cc @imatiach-msft

* - "dropImageFailures": Whether to drop the files that are not valid images from the result.
*
* @note This class is public for documentation purpose. Please don't use this class directly.
* Rather, use the data source API as illustrated above.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123, don't we plan to make a documentation in the site?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see a section in the doc that lists all built-in data sources. It would be nice if we create a section and link it to this API doc. I think we can do it with a follow-up PR. I want to see if we can get this PR merged before branch cut:)

@SparkQA
Copy link

SparkQA commented Sep 4, 2018

Test build #95668 has finished for PR 22328 at commit 5164d19.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imatiach-msft
Copy link
Contributor

question: why were the images renamed to images/images folder from just images? Seems a bit strange to me to have the same folder name twice.

* - "dropImageFailures": Whether to drop the files that are not valid images from the result.
*
* @note This class is public for documentation purpose. Please don't use this class directly.
* Rather, use the data source API as illustrated above.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see a section in the doc that lists all built-in data sources. It would be nice if we create a section and link it to this API doc. I think we can do it with a follow-up PR. I want to see if we can get this PR merged before branch cut:)

job: Job, options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
throw new UnsupportedOperationException(
s"prepareWrite is not supported for image data source")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message is user-facing and users do not know prepareWrite. So just say "Write is not supported"

class ImageSchemaSuite extends SparkFunSuite with MLlibTestSparkContext {
// Single column of images named "image"
private lazy val imagePath = "../data/mllib/images"
private lazy val imagePath = "../data/mllib/images/images"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"images/images" is confusing. Call it images/origin and images/partitioned

val mode = getMode(row)
val bytes20 = getData(row).slice(0, 20).toList
filename -> Tuple2(mode, bytes20)
}.toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Set instead of Map

@mengxr
Copy link
Contributor

mengxr commented Sep 4, 2018

@imatiach-msft @HyukjinKwon The plan is to mark ImageSchema deprecated in 2.4 and remove it in 3.0. So loading images will be the same as loading data from other sources.

The gaps are sampling and partition controlling, which might require more testing after 2.4. It would be great if you can help. For sampling, I'm thinking of allowing data source to handle sample operations. @cloud-fan is it feasible?

Copy link
Contributor

@imatiach-msft imatiach-msft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments

@@ -0,0 +1,13 @@
The images in the folder "kittens" are under the creative commons CC0 license, or no rights reserved:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why this was added to images/images folder, also strange that it wasn't moved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to comment above:
I don't think you need duplicate images in the PR - in the old tests you can just specify the path up to the cls/date folder. There might be a few minor changes to the tests but I think that would be a better strategy than to have duplicate images in source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worry. Only very few images. And keep old testcase not changed will help this PR get merged ASAP.

* - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases)
*
* To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and
* optionally specify options, for example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: although it makes sense, "optionally specify options" is a bit confusing, maybe "optionally specify arguments" or just "specify options"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter "options" is "datasource options", it is the widely used term.
So I prefer to change to "optionally specify the datasource options"

* optionally specify options, for example:
* {{{
* // Scala
* val df = spark.read.format("image")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very cool!


override def shortName(): String = "image"

override protected def buildReader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, is there any way we could combine the two apis? I don't like having to support two different implementations. Or, what is the issue that is blocking us from combining them?


// Images with the different number of channels
test("readImages pixel values test") {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: remove newline (or make all tests consistent in terms of formatting)

requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the sampling option be ported as well? It seemed like an important option in case users didn't want to load all images.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't be addressed in this PR. The best way to support it is to allow data source handle sampling operation. cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sample pushdown should be supported by data source v2 in the next release, then we can migrate the image source to data source v2 at that time.

@imatiach-msft
Copy link
Contributor

"specify numPartitions (it will be determined by datasource automatically)"
This feature was because of this bug:

https://issues.apache.org/jira/browse/SPARK-22357

Which was just fixed a week ago in spark 2.4.0.
It can be removed, even in the current API.

@imatiach-msft
Copy link
Contributor

@mengxr "is to mark ImageSchema deprecated in 2.4 and remove it in 3.0"
confused, this is still using ImageSchema though, right? You mean to remove the top-level readImages API, but the ImageSchema will remain the same, correct?

@imatiach-msft
Copy link
Contributor

I don't think you need duplicate images in the PR - in the old tests you can just specify the path up to the cls/date folder. There might be a few minor changes to the tests but I think that would be a better strategy than to have duplicate images in source.

@mengxr
Copy link
Contributor

mengxr commented Sep 4, 2018

Yes, the ImageSchema implementation are used by the data source, which we cannot remove:) We are only going to mark the public APIs there as deprecated. The goal is to provide users a unified approach to load data into Spark. Users usually find ImageSchema.readImages hard to discover.

@imatiach-msft
Copy link
Contributor

@mhamilton723 could you take a look at this PR? Mark added some performance improvements in MMLSpark that we wanted to merge in and he also added support for streaming (this was one of the PRs: https://github.com/Azure/mmlspark/pull/134/files , there were a couple more after). He also had some concerns about performance (specifically how we were storing the images as OpenCV bytes in the dataframe which he said took a lot of memory and we should use the more compressed format instead) and I recall he had a few suggestions on how we could improve it in the future. This seems like a good place to discuss how we could improve performance more.

* `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`.
* The loaded `DataFrame` has one `StructType` column: `image`.
* The schema of the `image` column is:
* - origin: String (represent the origin of image. If loaded from file, then it is file path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"represents" + "the image". I can see many missing as and thes in the description :(

* {{{
* // Scala
* val df = spark.read.format("image")
* .option("dropImageFailures", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true as a boolean value, please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

option API require (k: String, v:String) parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? What about option(key: String, value: Boolean): DataFrameReader then? There are more --> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader


override def prepareWrite(
sparkSession: SparkSession,
job: Job, options: Map[String, String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New line after job: Job

val df1 = spark.read.format("image").load(imagePath)
assert(df1.count === 9)

val df2 = spark.read.format("image").option("dropImageFailures", "true").load(imagePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true as a boolean value, please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

val filename = Paths.get(getOrigin(row)).getFileName().toString()
val mode = getMode(row)
val bytes20 = getData(row).slice(0, 20).toList
filename -> Tuple2(mode, bytes20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Tuple2 required here? Wouldn't (mode, bytes20) work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, (mode, bytes20) doesn't work, filename -> (mode, bytes20) will be compiled as filename.->(mode, bytes20) and -> receive 2 arguments and compile error occurs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It is useful to leave an inline comment here:)

val parquet = classOf[ParquetFileFormat].getCanonicalName
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val image = "org.apache.spark.ml.source.image.ImageFileFormat"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to libsvm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did it for libsvm for historical reasons. Since image is a new source, I don't think we have compatibility issues.

@mhamilton723
Copy link

@WeichenXu123. Awesome work! I have not had a chance to go through this in depth but I did this in the originating project, MMLSpark, a while back and have been meaning to ship it up to Spark Core:

https://github.com/Azure/mmlspark/tree/master/src/io/image

We built ours on the same functionality for Binary Files and it might be wise to do the same here if you have not already. The Binary File API is a bit more general as it supports reading many different kinds of files.

https://github.com/Azure/mmlspark/tree/master/src/io/binary

@mengxr
Copy link
Contributor

mengxr commented Sep 5, 2018

@mhamilton723 I thought about that option too. Loading general binary files is a useful feature but I don't feel it is necessary to pull it into the current scope. No matter whether the image data source has its own implementation or builds on top of the binary data source, I expect users to use

spark.read.format("image").load("...")

to read images instead of something like:

spark.read.format("binary").load("...").withColumn("image", decode($"binary"))

So we can definitely add binary file data source later and swap the implementation without changing the public interface. But we don't need to block this PR getting into 2.4, which will be cut soon.

Sounds good?

*
* To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and
* optionally specify options, for example:
* optionally specify the datasource options, for example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/datasource/data source

* IMAGE data source supports the following options:
* - "dropImageFailures": Whether to drop the files that are not valid images from the result.
*
* @note This IMAGE data source does not support "write".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/"write"/saving images to a file(s)/ ?


def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why false is a String not a boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because parameters is Map[String, String] type.

* @note This class is public for documentation purpose. Please don't use this class directly.
* Rather, use the data source API as illustrated above.
*/
class ImageDataSource private() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for doc.
similar to LibSVMDataSource

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a convention? AFAIK in the scala world we usually put document in package object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just simply remove this and make a followup for the doc in the site ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: @cloud-fan The Scala package doc doesn't work for Java, which requires a different format.

Re: @HyukjinKwon It would be nice to have some doc in the site, though I didn't find the list of built-in data sources in the doc site. I think it is okay to have docs in both locations for IDE users and for people search on the web.

@SparkQA
Copy link

SparkQA commented Sep 5, 2018

Test build #95694 has finished for PR 22328 at commit bd6178c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 5, 2018

Test build #95695 has finished for PR 22328 at commit 4d52754.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

* {{{
* // Scala
* val df = spark.read.format("image")
* .option("dropImageFailures", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? What about option(key: String, value: Boolean): DataFrameReader then? There are more --> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

@SparkQA
Copy link

SparkQA commented Sep 5, 2018

Test build #95707 has finished for PR 22328 at commit 3fffd7e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 5, 2018

Test build #95715 has finished for PR 22328 at commit 3c8863c.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

package org.apache.spark.ml.source.image

/**
* `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"IMAGE" doesn't need to be all uppercase. Just say "loading images".

* The loaded `DataFrame` has one `StructType` column: `image`.
* The schema of the `image` column is:
* - origin: String (represents the origin of the image.
* If loaded from files, then it is the file path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it always load from files?

* - mode: Int (OpenCV-compatible type)
* - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases)
*
* To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto on "IMAGE"

* }}}
*
* IMAGE data source supports the following options:
* - "dropImageFailures": Whether to drop the files that are not valid images from the result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing dropImageFailures to dropInvalid?


def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add ScalaDoc.

val filename = Paths.get(getOrigin(row)).getFileName().toString()
val mode = getMode(row)
val bytes20 = getData(row).slice(0, 20).toList
filename -> Tuple2(mode, bytes20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It is useful to leave an inline comment here:)

@mengxr
Copy link
Contributor

mengxr commented Sep 5, 2018

LGTM pending tests.

@SparkQA
Copy link

SparkQA commented Sep 5, 2018

Test build #95724 has finished for PR 22328 at commit 218ce4c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mengxr
Copy link
Contributor

mengxr commented Sep 5, 2018

test this please

@mengxr
Copy link
Contributor

mengxr commented Sep 5, 2018

The image data source tests passed but JVM crashed at the end. Triggered another test.

@SparkQA
Copy link

SparkQA commented Sep 5, 2018

Test build #95727 has finished for PR 22328 at commit 218ce4c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 9254492 Sep 5, 2018
@mengxr
Copy link
Contributor

mengxr commented Sep 5, 2018

Merged into master. Thanks @WeichenXu123 for the implementation and everyone for the review! I created the following JIRAs as follow-ups:

@WeichenXu123 WeichenXu123 deleted the image_datasource branch September 6, 2018 01:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants