Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions data/mllib/images/origin/license.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
The images in the folder "kittens" are under the creative commons CC0 license, or no rights reserved:
https://creativecommons.org/share-your-work/public-domain/cc0/
The images are taken from:
https://ccsearch.creativecommons.org/image/detail/WZnbJSJ2-dzIDiuUUdto3Q==
https://ccsearch.creativecommons.org/image/detail/_TlKu_rm_QrWlR0zthQTXA==
https://ccsearch.creativecommons.org/image/detail/OPNnHJb6q37rSZ5o_L5JHQ==
https://ccsearch.creativecommons.org/image/detail/B2CVP_j5KjwZm7UAVJ3Hvw==

The chr30.4.184.jpg and grayscale.jpg images are also under the CC0 license, taken from:
https://ccsearch.creativecommons.org/image/detail/8eO_qqotBfEm2UYxirLntw==

The image under "multi-channel" directory is under the CC BY-SA 4.0 license cropped from:
https://en.wikipedia.org/wiki/Alpha_compositing#/media/File:Hue_alpha_falloff.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
not an image
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
org.apache.spark.ml.source.libsvm.LibSVMFileFormat
org.apache.spark.ml.source.image.ImageFileFormat
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.source.image

/**
* `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"IMAGE" doesn't need to be all uppercase. Just say "loading images".

* The loaded `DataFrame` has one `StructType` column: `image`.
* The schema of the `image` column is:
* - origin: String (represents the origin of the image.
* If loaded from files, then it is the file path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it always load from files?

* - height: Int (height of the image)
* - width: Int (width of the image)
* - nChannels: Int (number of the image channels)
* - mode: Int (OpenCV-compatible type)
* - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases)
*
* To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto on "IMAGE"

* optionally specify the data source options, for example:
* {{{
* // Scala
* val df = spark.read.format("image")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very cool!

* .option("dropImageFailures", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true as a boolean value, please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

option API require (k: String, v:String) parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? What about option(key: String, value: Boolean): DataFrameReader then? There are more --> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

* .load("data/mllib/images/partitioned")
*
* // Java
* Dataset<Row> df = spark.read().format("image")
* .option("dropImageFailures", "true")
* .load("data/mllib/images/partitioned");
* }}}
*
* IMAGE data source supports the following options:
* - "dropImageFailures": Whether to drop the files that are not valid images from the result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing dropImageFailures to dropInvalid?

*
* @note This IMAGE data source does not support saving images to files.
*
* @note This class is public for documentation purpose. Please don't use this class directly.
* Rather, use the data source API as illustrated above.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123, don't we plan to make a documentation in the site?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see a section in the doc that lists all built-in data sources. It would be nice if we create a section and link it to this API doc. I think we can do it with a follow-up PR. I want to see if we can get this PR merged before branch cut:)

*/
class ImageDataSource private() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for doc.
similar to LibSVMDataSource

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a convention? AFAIK in the scala world we usually put document in package object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just simply remove this and make a followup for the doc in the site ..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: @cloud-fan The Scala package doc doesn't work for Java, which requires a different format.

Re: @HyukjinKwon It would be nice to have some doc in the site, though I didn't find the list of built-in data sources in the doc site. I think it is okay to have docs in both locations for IDE users and for people search on the web.

Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.source.image

import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.ml.image.ImageSchema
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeRow}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

private[image] class ImageFileFormat extends FileFormat with DataSourceRegister {

override def inferSchema(
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = Some(ImageSchema.imageSchema)

override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
throw new UnsupportedOperationException("Write is not supported for image data source")
}

override def shortName(): String = "image"

override protected def buildReader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, is there any way we could combine the two apis? I don't like having to support two different implementations. Or, what is the issue that is blocking us from combining them?

sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the sampling option be ported as well? It seemed like an important option in case users didn't want to load all images.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't be addressed in this PR. The best way to support it is to allow data source handle sampling operation. cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sample pushdown should be supported by data source v2 in the next release, then we can migrate the image source to data source v2 at that time.

assert(
requiredSchema.length <= 1,
"Image data source only produces a single data column named \"image\".")

val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

val imageSourceOptions = new ImageOptions(options)

(file: PartitionedFile) => {
val emptyUnsafeRow = new UnsafeRow(0)
if (!imageSourceOptions.dropImageFailures && requiredSchema.isEmpty) {
Iterator(emptyUnsafeRow)
} else {
val origin = file.filePath
val path = new Path(origin)
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val stream = fs.open(path)
val bytes = try {
ByteStreams.toByteArray(stream)
} finally {
Closeables.close(stream, true)
}
val resultOpt = ImageSchema.decode(origin, bytes)
val filteredResult = if (imageSourceOptions.dropImageFailures) {
resultOpt.toIterator
} else {
Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin)))
}

if (requiredSchema.isEmpty) {
filteredResult.map(_ => emptyUnsafeRow)
} else {
val converter = RowEncoder(requiredSchema)
filteredResult.map(row => converter.toRow(row))
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.source.image

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap

private[image] class ImageOptions(
@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable {

def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why false is a String not a boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because parameters is Map[String, String] type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add ScalaDoc.

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._

class ImageSchemaSuite extends SparkFunSuite with MLlibTestSparkContext {
// Single column of images named "image"
private lazy val imagePath = "../data/mllib/images"
private lazy val imagePath = "../data/mllib/images/origin"

test("Smoke test: create basic ImageSchema dataframe") {
val origin = "path"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.source.image

import java.nio.file.Paths

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.image.ImageSchema._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, substring_index}

class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {

// Single column of images named "image"
private lazy val imagePath = "../data/mllib/images/partitioned"

test("image datasource count test") {
val df1 = spark.read.format("image").load(imagePath)
assert(df1.count === 9)

val df2 = spark.read.format("image").option("dropImageFailures", "true").load(imagePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true as a boolean value, please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

assert(df2.count === 8)
}

test("image datasource test: read jpg image") {
val df = spark.read.format("image").load(imagePath + "/cls=kittens/date=2018-02/DP153539.jpg")
assert(df.count() === 1)
}

test("image datasource test: read png image") {
val df = spark.read.format("image").load(imagePath + "/cls=multichannel/date=2018-01/BGRA.png")
assert(df.count() === 1)
}

test("image datasource test: read non image") {
val filePath = imagePath + "/cls=kittens/date=2018-01/not-image.txt"
val df = spark.read.format("image").option("dropImageFailures", "true")
.load(filePath)
assert(df.count() === 0)

val df2 = spark.read.format("image").option("dropImageFailures", "false")
.load(filePath)
assert(df2.count() === 1)
val result = df2.head()
assert(result === invalidImageRow(
Paths.get(filePath).toAbsolutePath().normalize().toUri().toString))
}

test("image datasource partition test") {
val result = spark.read.format("image")
.option("dropImageFailures", "true").load(imagePath)
.select(substring_index(col("image.origin"), "/", -1).as("origin"), col("cls"), col("date"))
.collect()

assert(Set(result: _*) === Set(
Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"),
Row("54893.jpg", "kittens", "2018-02"),
Row("DP153539.jpg", "kittens", "2018-02"),
Row("DP802813.jpg", "kittens", "2018-02"),
Row("BGRA.png", "multichannel", "2018-01"),
Row("BGRA_alpha_60.png", "multichannel", "2018-01"),
Row("chr30.4.184.jpg", "multichannel", "2018-02"),
Row("grayscale.jpg", "multichannel", "2018-02")
))
}

// Images with the different number of channels
test("readImages pixel values test") {
val images = spark.read.format("image").option("dropImageFailures", "true")
.load(imagePath + "/cls=multichannel/").collect()

val firstBytes20Set = images.map { rrow =>
val row = rrow.getAs[Row]("image")
val filename = Paths.get(getOrigin(row)).getFileName().toString()
val mode = getMode(row)
val bytes20 = getData(row).slice(0, 20).toList
filename -> Tuple2(mode, bytes20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Tuple2 required here? Wouldn't (mode, bytes20) work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, (mode, bytes20) doesn't work, filename -> (mode, bytes20) will be compiled as filename.->(mode, bytes20) and -> receive 2 arguments and compile error occurs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It is useful to leave an inline comment here:)

}.toSet

assert(firstBytes20Set === expectedFirstBytes20Set)
}

// number of channels and first 20 bytes of OpenCV representation
// - default representation for 3-channel RGB images is BGR row-wise:
// (B00, G00, R00, B10, G10, R10, ...)
// - default representation for 4-channel RGB images is BGRA row-wise:
// (B00, G00, R00, A00, B10, G10, R10, A10, ...)
private val expectedFirstBytes20Set = Set(
"grayscale.jpg" ->
((0, List[Byte](-2, -33, -61, -60, -59, -59, -64, -59, -66, -67, -73, -73, -62,
-57, -60, -63, -53, -49, -55, -69))),
"chr30.4.184.jpg" -> ((16,
List[Byte](-9, -3, -1, -43, -32, -28, -75, -60, -57, -78, -59, -56, -74, -59, -57,
-71, -58, -56, -73, -64))),
"BGRA.png" -> ((24,
List[Byte](-128, -128, -8, -1, -128, -128, -8, -1, -128,
-128, -8, -1, 127, 127, -9, -1, 127, 127, -9, -1))),
"BGRA_alpha_60.png" -> ((24,
List[Byte](-128, -128, -8, 60, -128, -128, -8, 60, -128,
-128, -8, 60, 127, 127, -9, 60, 127, 127, -9, 60)))
)
}