-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22666][ML][SQL] Spark datasource for image format #22328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
5b5aee6
5164d19
bd6178c
4d52754
3fffd7e
3c8863c
218ce4c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| The images in the folder "kittens" are under the creative commons CC0 license, or no rights reserved: | ||
| https://creativecommons.org/share-your-work/public-domain/cc0/ | ||
| The images are taken from: | ||
| https://ccsearch.creativecommons.org/image/detail/WZnbJSJ2-dzIDiuUUdto3Q== | ||
| https://ccsearch.creativecommons.org/image/detail/_TlKu_rm_QrWlR0zthQTXA== | ||
| https://ccsearch.creativecommons.org/image/detail/OPNnHJb6q37rSZ5o_L5JHQ== | ||
| https://ccsearch.creativecommons.org/image/detail/B2CVP_j5KjwZm7UAVJ3Hvw== | ||
|
|
||
| The chr30.4.184.jpg and grayscale.jpg images are also under the CC0 license, taken from: | ||
| https://ccsearch.creativecommons.org/image/detail/8eO_qqotBfEm2UYxirLntw== | ||
|
|
||
| The image under "multi-channel" directory is under the CC BY-SA 4.0 license cropped from: | ||
| https://en.wikipedia.org/wiki/Alpha_compositing#/media/File:Hue_alpha_falloff.png |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| not an image |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,2 @@ | ||
| org.apache.spark.ml.source.libsvm.LibSVMFileFormat | ||
| org.apache.spark.ml.source.image.ImageFileFormat |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.ml.source.image | ||
|
|
||
| /** | ||
| * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. | ||
| * The loaded `DataFrame` has one `StructType` column: `image`. | ||
| * The schema of the `image` column is: | ||
| * - origin: String (represent the origin of image. If loaded from file, then it is file path) | ||
|
||
| * - height: Int (height of image) | ||
| * - width: Int (width of image) | ||
| * - nChannels: Int (number of image channels) | ||
| * - mode: Int (OpenCV-compatible type) | ||
| * - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases) | ||
| * | ||
| * To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and | ||
|
||
| * optionally specify the datasource options, for example: | ||
|
||
| * {{{ | ||
| * // Scala | ||
| * val df = spark.read.format("image") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is very cool! |
||
| * .option("dropImageFailures", "true") | ||
|
||
| * .load("data/mllib/images/imagesWithPartitions") | ||
| * | ||
| * // Java | ||
| * Dataset<Row> df = spark.read().format("image") | ||
| * .option("dropImageFailures", "true") | ||
| * .load("data/mllib/images/imagesWithPartitions"); | ||
| * }}} | ||
| * | ||
| * IMAGE data source supports the following options: | ||
| * - "dropImageFailures": Whether to drop the files that are not valid images from the result. | ||
|
||
| * | ||
| * @note This IMAGE data source does not support "write". | ||
|
||
| * | ||
| * @note This class is public for documentation purpose. Please don't use this class directly. | ||
| * Rather, use the data source API as illustrated above. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @WeichenXu123, don't we plan to make a documentation in the site?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't see a section in the doc that lists all built-in data sources. It would be nice if we create a section and link it to this API doc. I think we can do it with a follow-up PR. I want to see if we can get this PR merged before branch cut:) |
||
| */ | ||
| class ImageDataSource private() {} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need this class?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for doc.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a convention? AFAIK in the scala world we usually put document in package object.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just simply remove this and make a followup for the doc in the site ..
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re: @cloud-fan The Scala package doc doesn't work for Java, which requires a different format. Re: @HyukjinKwon It would be nice to have some doc in the site, though I didn't find the list of built-in data sources in the doc site. I think it is okay to have docs in both locations for IDE users and for people search on the web. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.ml.source.image | ||
|
|
||
| import com.google.common.io.{ByteStreams, Closeables} | ||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.{FileStatus, Path} | ||
| import org.apache.hadoop.mapreduce.Job | ||
|
|
||
| import org.apache.spark.ml.image.ImageSchema | ||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.encoders.RowEncoder | ||
| import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeRow} | ||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | ||
| import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, OutputWriterFactory, PartitionedFile} | ||
| import org.apache.spark.sql.sources.{DataSourceRegister, Filter} | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.util.SerializableConfiguration | ||
|
|
||
| private[image] class ImageFileFormat extends FileFormat with DataSourceRegister { | ||
|
|
||
| override def inferSchema( | ||
| sparkSession: SparkSession, | ||
| options: Map[String, String], | ||
| files: Seq[FileStatus]): Option[StructType] = Some(ImageSchema.imageSchema) | ||
|
|
||
| override def prepareWrite( | ||
| sparkSession: SparkSession, | ||
| job: Job, options: Map[String, String], | ||
|
||
| dataSchema: StructType): OutputWriterFactory = { | ||
| throw new UnsupportedOperationException("Write is not supported for image data source") | ||
| } | ||
|
|
||
| override def shortName(): String = "image" | ||
|
|
||
| override protected def buildReader( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, is there any way we could combine the two apis? I don't like having to support two different implementations. Or, what is the issue that is blocking us from combining them? |
||
| sparkSession: SparkSession, | ||
| dataSchema: StructType, | ||
| partitionSchema: StructType, | ||
| requiredSchema: StructType, | ||
| filters: Seq[Filter], | ||
| options: Map[String, String], | ||
| hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should the sampling option be ported as well? It seemed like an important option in case users didn't want to load all images.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It won't be addressed in this PR. The best way to support it is to allow data source handle sampling operation. cc @cloud-fan
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sample pushdown should be supported by data source v2 in the next release, then we can migrate the image source to data source v2 at that time. |
||
| assert( | ||
| requiredSchema.length <= 1, | ||
| "Image data source only produces a single data column named \"image\".") | ||
|
|
||
| val broadcastedHadoopConf = | ||
| sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) | ||
|
|
||
| val imageSourceOptions = new ImageOptions(options) | ||
|
|
||
| (file: PartitionedFile) => { | ||
| val emptyUnsafeRow = new UnsafeRow(0) | ||
| if (!imageSourceOptions.dropImageFailures && requiredSchema.isEmpty) { | ||
| Iterator(emptyUnsafeRow) | ||
| } else { | ||
| val origin = file.filePath | ||
| val path = new Path(origin) | ||
| val fs = path.getFileSystem(broadcastedHadoopConf.value.value) | ||
| val stream = fs.open(path) | ||
| val bytes = try { | ||
| ByteStreams.toByteArray(stream) | ||
| } finally { | ||
| Closeables.close(stream, true) | ||
| } | ||
| val resultOpt = ImageSchema.decode(origin, bytes) | ||
| val filteredResult = if (imageSourceOptions.dropImageFailures) { | ||
| resultOpt.toIterator | ||
| } else { | ||
| Iterator(resultOpt.getOrElse(ImageSchema.invalidImageRow(origin))) | ||
| } | ||
|
|
||
| if (requiredSchema.isEmpty) { | ||
| filteredResult.map(_ => emptyUnsafeRow) | ||
| } else { | ||
| val converter = RowEncoder(requiredSchema) | ||
| filteredResult.map(row => converter.toRow(row)) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.ml.source.image | ||
|
|
||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | ||
|
|
||
| private[image] class ImageOptions( | ||
| @transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { | ||
|
|
||
| def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) | ||
|
|
||
| val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean | ||
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.ml.source.image | ||
|
|
||
| import java.nio.file.Paths | ||
|
|
||
| import org.apache.spark.SparkFunSuite | ||
| import org.apache.spark.ml.image.ImageSchema._ | ||
| import org.apache.spark.mllib.util.MLlibTestSparkContext | ||
| import org.apache.spark.sql.Row | ||
| import org.apache.spark.sql.functions.{col, substring_index} | ||
|
|
||
| class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext { | ||
|
|
||
| // Single column of images named "image" | ||
| private lazy val imagePath = "../data/mllib/images/partitioned" | ||
|
|
||
| test("image datasource count test") { | ||
| val df1 = spark.read.format("image").load(imagePath) | ||
| assert(df1.count === 9) | ||
|
|
||
| val df2 = spark.read.format("image").option("dropImageFailures", "true").load(imagePath) | ||
|
||
| assert(df2.count === 8) | ||
| } | ||
|
|
||
| test("image datasource test: read jpg image") { | ||
| val df = spark.read.format("image").load(imagePath + "/cls=kittens/date=2018-02/DP153539.jpg") | ||
| assert(df.count() === 1) | ||
| } | ||
|
|
||
| test("image datasource test: read png image") { | ||
| val df = spark.read.format("image").load(imagePath + "/cls=multichannel/date=2018-01/BGRA.png") | ||
| assert(df.count() === 1) | ||
| } | ||
|
|
||
| test("image datasource test: read non image") { | ||
| val filePath = imagePath + "/cls=kittens/date=2018-01/not-image.txt" | ||
| val df = spark.read.format("image").option("dropImageFailures", "true") | ||
| .load(filePath) | ||
| assert(df.count() === 0) | ||
|
|
||
| val df2 = spark.read.format("image").option("dropImageFailures", "false") | ||
| .load(filePath) | ||
| assert(df2.count() === 1) | ||
| val result = df2.head() | ||
| assert(result === invalidImageRow( | ||
| Paths.get(filePath).toAbsolutePath().normalize().toUri().toString)) | ||
| } | ||
|
|
||
| test("image datasource partition test") { | ||
| val result = spark.read.format("image") | ||
| .option("dropImageFailures", "true").load(imagePath) | ||
| .select(substring_index(col("image.origin"), "/", -1).as("origin"), col("cls"), col("date")) | ||
| .collect() | ||
|
|
||
| assert(Set(result: _*) === Set( | ||
| Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"), | ||
| Row("54893.jpg", "kittens", "2018-02"), | ||
| Row("DP153539.jpg", "kittens", "2018-02"), | ||
| Row("DP802813.jpg", "kittens", "2018-02"), | ||
| Row("BGRA.png", "multichannel", "2018-01"), | ||
| Row("BGRA_alpha_60.png", "multichannel", "2018-01"), | ||
| Row("chr30.4.184.jpg", "multichannel", "2018-02"), | ||
| Row("grayscale.jpg", "multichannel", "2018-02") | ||
| )) | ||
| } | ||
|
|
||
| // Images with the different number of channels | ||
| test("readImages pixel values test") { | ||
| val images = spark.read.format("image").option("dropImageFailures", "true") | ||
| .load(imagePath + "/cls=multichannel/").collect() | ||
|
|
||
| val firstBytes20Set = images.map { rrow => | ||
| val row = rrow.getAs[Row]("image") | ||
| val filename = Paths.get(getOrigin(row)).getFileName().toString() | ||
| val mode = getMode(row) | ||
| val bytes20 = getData(row).slice(0, 20).toList | ||
| filename -> Tuple2(mode, bytes20) | ||
|
||
| }.toSet | ||
|
|
||
| assert(firstBytes20Set === expectedFirstBytes20Set) | ||
| } | ||
|
|
||
| // number of channels and first 20 bytes of OpenCV representation | ||
| // - default representation for 3-channel RGB images is BGR row-wise: | ||
| // (B00, G00, R00, B10, G10, R10, ...) | ||
| // - default representation for 4-channel RGB images is BGRA row-wise: | ||
| // (B00, G00, R00, A00, B10, G10, R10, A10, ...) | ||
| private val expectedFirstBytes20Set = Set( | ||
| "grayscale.jpg" -> | ||
| ((0, List[Byte](-2, -33, -61, -60, -59, -59, -64, -59, -66, -67, -73, -73, -62, | ||
| -57, -60, -63, -53, -49, -55, -69))), | ||
| "chr30.4.184.jpg" -> ((16, | ||
| List[Byte](-9, -3, -1, -43, -32, -28, -75, -60, -57, -78, -59, -56, -74, -59, -57, | ||
| -71, -58, -56, -73, -64))), | ||
| "BGRA.png" -> ((24, | ||
| List[Byte](-128, -128, -8, -1, -128, -128, -8, -1, -128, | ||
| -128, -8, -1, 127, 127, -9, -1, 127, 127, -9, -1))), | ||
| "BGRA_alpha_60.png" -> ((24, | ||
| List[Byte](-128, -128, -8, 60, -128, -128, -8, 60, -128, | ||
| -128, -8, 60, 127, 127, -9, 60, 127, 127, -9, 60))) | ||
| ) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -567,6 +567,7 @@ object DataSource extends Logging { | |
| val parquet = classOf[ParquetFileFormat].getCanonicalName | ||
| val csv = classOf[CSVFileFormat].getCanonicalName | ||
| val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" | ||
| val image = "org.apache.spark.ml.source.image.ImageFileFormat" | ||
|
||
| val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" | ||
| val nativeOrc = classOf[OrcFileFormat].getCanonicalName | ||
| val socket = classOf[TextSocketSourceProvider].getCanonicalName | ||
|
|
@@ -591,6 +592,7 @@ object DataSource extends Logging { | |
| "org.apache.spark.sql.execution.datasources.orc" -> nativeOrc, | ||
| "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, | ||
| "org.apache.spark.ml.source.libsvm" -> libsvm, | ||
| "org.apache.spark.ml.source.image.ImageFileFormat" -> image, | ||
| "com.databricks.spark.csv" -> csv, | ||
| "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket, | ||
| "org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"IMAGE" doesn't need to be all uppercase. Just say "loading images".