Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-28191][SS] New data source - state - reader part
  • Loading branch information
HeartSaVioR committed Dec 10, 2020
commit 7f2b74eaef983f8cab0c340efb507e09d6db7a85
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ org.apache.spark.sql.execution.datasources.noop.NoopDataSource
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2
Copy link
Member

@HyukjinKwon HyukjinKwon Apr 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came from the PR you pointed out. Why is it state? Can batch query use this source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"state" is the one of the "terms" of "structured streaming" (not actually tied to structured streaming but tied to recent streaming technology). It's being created and used from structured streaming, but there're some cases we want to modify the state "outside" of the streaming query, like changing schema, repartitioning, etc. This data source will allow "batch query" to do it. (So the data source is not even designed to use from streaming query by intention.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So, it's designed for batch query for the state generated from structured streaming.
@HeartSaVioR, could I ask to post a working example in PR or JIRA description? I think one working example will clarify what this source/PR targets.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Apr 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do when I get any actual reviewer who is willing to be a shepherd on this issue - the only request I got for this feature was asking for SPIP.

https://github.com/HeartSaVioR/spark-state-tools

Above repository contains entire functionalities (though it's tied to Spark 2.4 and some weird usage because Spark doesn't provide schema information) and explanation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking showing an example can actually clarify the importance of this source easily and hopefully we can get more review and attention. But okay, we can wait for the review first too.

org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2.state

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.{CommitLog, OffsetSeqLog, OffsetSeqMetadata}

/**
* Providing features to deal with checkpoint, like creating savepoint.
*/
object CheckpointUtil {

/**
* Create savepoint from existing checkpoint.
* OffsetLog and CommitLog will be purged based on newLastBatchId.
* Use `additionalMetadataConf` to modify metadata configuration: you may want to modify it
* when rescaling state, or migrate state format version.
* e.g. when rescaling, pass Map(SQLConf.SHUFFLE_PARTITIONS.key -> newShufflePartitions.toString)
*
* @param sparkSession spark session
* @param checkpointRoot the root path of existing checkpoint
* @param newCheckpointRoot the root path of new savepoint - target directory should be empty
* @param newLastBatchId the new last batch ID - it needs to be one of committed batch ID
* @param additionalMetadataConf the configuration to add to existing metadata configuration
* @param excludeState whether to exclude state directory
*/
def createSavePoint(
sparkSession: SparkSession,
checkpointRoot: String,
newCheckpointRoot: String,
newLastBatchId: Long,
additionalMetadataConf: Map[String, String],
excludeState: Boolean = false): Unit = {
val hadoopConf = sparkSession.sessionState.newHadoopConf()

val src = new Path(resolve(hadoopConf, checkpointRoot))
val srcFs = src.getFileSystem(hadoopConf)
val dst = new Path(resolve(hadoopConf, newCheckpointRoot))
val dstFs = dst.getFileSystem(hadoopConf)

if (dstFs.listFiles(dst, false).hasNext) {
throw new IllegalArgumentException("Destination directory should be empty.")
}

dstFs.mkdirs(dst)

// copy content of src directory to dst directory
srcFs.listStatus(src).foreach { fs =>
val path = fs.getPath
val fileName = path.getName
if (fileName == "state" && excludeState) {
// pass
} else {
FileUtil.copy(srcFs, path, dstFs, new Path(dst, fileName),
false, false, hadoopConf)
}
}

val offsetLog = new OffsetSeqLog(sparkSession, new Path(dst, "offsets").toString)
val logForBatch = offsetLog.get(newLastBatchId) match {
case Some(log) => log
case None => throw new IllegalStateException("offset log for batch should be exist")
}

val newMetadata = logForBatch.metadata match {
case Some(md) =>
val newMap = md.conf ++ additionalMetadataConf
Some(md.copy(conf = newMap))
case None =>
Some(OffsetSeqMetadata(conf = additionalMetadataConf))
}

val newLogForBatch = logForBatch.copy(metadata = newMetadata)

// we will restart from last batch + 1: overwrite the last batch with new configuration
offsetLog.purgeAfter(newLastBatchId - 1)
offsetLog.add(newLastBatchId, newLogForBatch)

val commitLog = new CommitLog(sparkSession, new Path(dst, "commits").toString)
commitLog.purgeAfter(newLastBatchId)

// state doesn't expose purge mechanism as its interface
// assuming state would work with overwriting batch files when it replays previous batch
}

private def resolve(hadoopConf: Configuration, cpLocation: String): String = {
val checkpointPath = new Path(cpLocation)
val fs = checkpointPath.getFileSystem(hadoopConf)
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.state

import org.apache.spark.sql.types.{DataType, StructType}

object SchemaUtil {
def getSchemaAsDataType(schema: StructType, fieldName: String): DataType = {
schema(schema.getFieldIndex(fieldName).get).dataType
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.state

import java.util
import java.util.Map

import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.streaming.state.StateStoreId
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class StateDataSourceV2 extends TableProvider with DataSourceRegister {

import StateDataSourceV2._

lazy val session = SparkSession.active

override def shortName(): String = "state"

override def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
val checkpointLocation = Option(properties.get(PARAM_CHECKPOINT_LOCATION)).orElse {
throw new AnalysisException(s"'$PARAM_CHECKPOINT_LOCATION' must be specified.")
}.get

val version = Option(properties.get(PARAM_VERSION)).map(_.toInt).orElse {
throw new AnalysisException(s"'$PARAM_VERSION' must be specified.")
}.get

val operatorId = Option(properties.get(PARAM_OPERATOR_ID)).map(_.toInt).orElse {
throw new AnalysisException(s"'$PARAM_OPERATOR_ID' must be specified.")
}.get

val storeName = Option(properties.get(PARAM_STORE_NAME))
.orElse(Some(StateStoreId.DEFAULT_STORE_NAME)).get

new StateTable(session, schema, checkpointLocation, version, operatorId, storeName)
}

override def inferSchema(options: CaseInsensitiveStringMap): StructType =
throw new UnsupportedOperationException("Schema should be explicitly specified.")

override def supportsExternalMetadata(): Boolean = true
}

object StateDataSourceV2 {
val PARAM_CHECKPOINT_LOCATION = "checkpointLocation"
val PARAM_VERSION = "version"
val PARAM_OPERATOR_ID = "operatorId"
val PARAM_STORE_NAME = "storeName"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.state

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

class StatePartitionReader(
storeConf: StateStoreConf,
hadoopConf: SerializableConfiguration,
partition: StateStoreInputPartition,
schema: StructType) extends PartitionReader[InternalRow] {

private val keySchema = SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType]
private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, "value").asInstanceOf[StructType]

private lazy val iter = {
val stateStoreId = StateStoreId(partition.stateCheckpointRootLocation,
partition.operatorId, partition.partition, partition.storeName)
val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)

val store = StateStore.get(stateStoreProviderId, keySchema, valueSchema,
indexOrdinal = None, version = partition.version, storeConf = storeConf,
hadoopConf = hadoopConf.value)

store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
}

private var current: InternalRow = _

override def next(): Boolean = {
if (iter.hasNext) {
current = iter.next()
true
} else {
current = null
false
}
}

override def get(): InternalRow = current

override def close(): Unit = {
current = null
}

private def unifyStateRowPair(pair: (UnsafeRow, UnsafeRow)): InternalRow = {
val row = new GenericInternalRow(2)
row.update(0, pair._1)
row.update(1, pair._2)
row
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2.state

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.execution.streaming.state.StateStoreConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

class StatePartitionReaderFactory(
storeConf: StateStoreConf,
hadoopConf: SerializableConfiguration,
schema: StructType) extends PartitionReaderFactory {

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
val part = partition match {
case p: StateStoreInputPartition => p
case e => throw new IllegalStateException("Expected StateStorePartition but other type of " +
s"partition passed - $e")
}

new StatePartitionReader(storeConf, hadoopConf, part, schema)
}
}
Loading