Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
org.apache.spark.ml.source.libsvm.LibSVMFileFormat
org.apache.spark.ml.source.image.ImageFileFormat
org.apache.spark.ml.source.image.ImageFileFormat
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ package org.apache.spark.ml.source.image
* - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases)
*
* To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto on "IMAGE"

* optionally specify options, for example:
* optionally specify the datasource options, for example:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/datasource/data source

* {{{
* // Scala
* val df = spark.read.format("image")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very cool!

Expand All @@ -45,6 +45,8 @@ package org.apache.spark.ml.source.image
* IMAGE data source supports the following options:
* - "dropImageFailures": Whether to drop the files that are not valid images from the result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing dropImageFailures to dropInvalid?

*
* @note This IMAGE data source does not support "write".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/"write"/saving images to a file(s)/ ?

*
* @note This class is public for documentation purpose. Please don't use this class directly.
* Rather, use the data source API as illustrated above.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123, don't we plan to make a documentation in the site?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see a section in the doc that lists all built-in data sources. It would be nice if we create a section and link it to this API doc. I think we can do it with a follow-up PR. I want to see if we can get this PR merged before branch cut:)

*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration


private[image] class ImageFileFormatOptions(
@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable {

def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean
}

private[image] class ImageFileFormat extends FileFormat with DataSourceRegister {

override def inferSchema(
Expand All @@ -53,8 +44,7 @@ private[image] class ImageFileFormat extends FileFormat with DataSourceRegister
sparkSession: SparkSession,
job: Job, options: Map[String, String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New line after job: Job

dataSchema: StructType): OutputWriterFactory = {
throw new UnsupportedOperationException(
s"prepareWrite is not supported for image data source")
throw new UnsupportedOperationException("Write is not supported for image data source")
}

override def shortName(): String = "image"
Expand All @@ -74,7 +64,7 @@ private[image] class ImageFileFormat extends FileFormat with DataSourceRegister
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

val imageSourceOptions = new ImageFileFormatOptions(options)
val imageSourceOptions = new ImageOptions(options)

(file: PartitionedFile) => {
val emptyUnsafeRow = new UnsafeRow(0)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.source.image

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap

private[image] class ImageOptions(
@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable {

def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why false is a String not a boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because parameters is Map[String, String] type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add ScalaDoc.

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.types._

class ImageSchemaSuite extends SparkFunSuite with MLlibTestSparkContext {
// Single column of images named "image"
private lazy val imagePath = "../data/mllib/images/images"
private lazy val imagePath = "../data/mllib/images/origin"

test("Smoke test: create basic ImageSchema dataframe") {
val origin = "path"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.functions.{col, substring_index}
class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {

// Single column of images named "image"
private lazy val imagePath = "../data/mllib/images/imagesWithPartitions"
private lazy val imagePath = "../data/mllib/images/partitioned"

test("image datasource count test") {
val df1 = spark.read.format("image").load(imagePath)
Expand Down Expand Up @@ -82,27 +82,26 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext {

// Images with the different number of channels
test("readImages pixel values test") {

val images = spark.read.format("image").option("dropImageFailures", "true")
.load(imagePath + "/cls=multichannel/").collect()

val firstBytes20Map = images.map { rrow =>
val firstBytes20Set = images.map { rrow =>
val row = rrow.getAs[Row]("image")
val filename = Paths.get(getOrigin(row)).getFileName().toString()
val mode = getMode(row)
val bytes20 = getData(row).slice(0, 20).toList
filename -> Tuple2(mode, bytes20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Tuple2 required here? Wouldn't (mode, bytes20) work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, (mode, bytes20) doesn't work, filename -> (mode, bytes20) will be compiled as filename.->(mode, bytes20) and -> receive 2 arguments and compile error occurs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It is useful to leave an inline comment here:)

}.toMap
}.toSet

assert(firstBytes20Map === expectedFirstBytes20Map)
assert(firstBytes20Set === expectedFirstBytes20Set)
}

// number of channels and first 20 bytes of OpenCV representation
// - default representation for 3-channel RGB images is BGR row-wise:
// (B00, G00, R00, B10, G10, R10, ...)
// - default representation for 4-channel RGB images is BGRA row-wise:
// (B00, G00, R00, A00, B10, G10, R10, A10, ...)
private val expectedFirstBytes20Map = Map(
private val expectedFirstBytes20Set = Set(
"grayscale.jpg" ->
((0, List[Byte](-2, -33, -61, -60, -59, -59, -64, -59, -66, -67, -73, -73, -62,
-57, -60, -63, -53, -49, -55, -69))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,6 @@ object DataSource extends Logging {
"org.apache.spark.sql.execution.datasources.orc" -> nativeOrc,
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"org.apache.spark.ml.source.image.ImageFileFormat.DefaultSource" -> image,
"org.apache.spark.ml.source.image.ImageFileFormat" -> image,
"com.databricks.spark.csv" -> csv,
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
Expand Down