Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1d6b718
continuous shuffle read RDD
jose-torres May 15, 2018
b5d1008
docs
jose-torres May 17, 2018
af40769
Merge remote-tracking branch 'apache/master' into readerRddMaster
jose-torres May 17, 2018
46456dc
fix ctor
jose-torres May 17, 2018
2ea8a6f
multiple partition test
jose-torres May 17, 2018
955ac79
unset task context after test
jose-torres May 17, 2018
8cefb72
conf from RDD
jose-torres May 18, 2018
f91bfe7
endpoint name
jose-torres May 18, 2018
2590292
testing bool
jose-torres May 18, 2018
859e6e4
tests
jose-torres May 18, 2018
b23b7bb
take instead of poll
jose-torres May 18, 2018
97f7e8f
add interface
jose-torres May 18, 2018
de21b1c
clarify comment
jose-torres May 18, 2018
7dcf51a
multiple
jose-torres May 18, 2018
ad0b5aa
writer with 1 reader partition
jose-torres May 25, 2018
c9adee5
docs and iface
jose-torres May 25, 2018
63d38d8
Merge remote-tracking branch 'apache/master' into writerTask
jose-torres May 25, 2018
331f437
increment epoch
jose-torres May 25, 2018
f3ce675
undo oop
jose-torres May 25, 2018
e0108d7
make rdd loop
jose-torres May 25, 2018
f400651
remote write RDD
jose-torres May 25, 2018
1aaad8d
rename classes
jose-torres May 25, 2018
59890d4
combine suites
jose-torres May 25, 2018
af1508c
fully rm old suite
jose-torres May 25, 2018
65837ac
reorder tests
jose-torres May 29, 2018
a68fae2
return future
jose-torres May 31, 2018
98d55e4
finish getting rid of old name
jose-torres May 31, 2018
e6b9118
synchronous
jose-torres May 31, 2018
629455b
finish rename
jose-torres May 31, 2018
cb6d42b
add timeouts
jose-torres Jun 13, 2018
59d6ff7
unalign
jose-torres Jun 13, 2018
f90388c
add note
jose-torres Jun 13, 2018
4bbdeae
parallel
jose-torres Jun 13, 2018
e57531d
fix compile
jose-torres Jun 13, 2018
cff37c4
fix compile
jose-torres Jun 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
docs and iface
  • Loading branch information
jose-torres committed May 25, 2018
commit c9adee5423c2e8a030911008d2e6942045d484bb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ package org.apache.spark.sql.execution.streaming.continuous.shuffle
import org.apache.spark.{Partition, Partitioner, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow

/**
*
* @param prev The RDD to write to the continuous shuffle.
* @param outputPartitioner The partitioner on the reader side of the shuffle.
* @param endpoints The [[UnsafeRowReceiver]] endpoints to write to. Indexed by partition ID within
* outputPartitioner.
*/
class ContinuousShuffleWriteRDD(
var prev: RDD[UnsafeRow],
outputPartitioner: Partitioner,
Expand All @@ -32,7 +38,8 @@ class ContinuousShuffleWriteRDD(
override def getPartitions: Array[Partition] = prev.partitions

override def compute(split: Partition, context: TaskContext): Iterator[Unit] = {
val writer = new ContinuousShuffleWriter(split.index, outputPartitioner, endpoints)
val writer: ContinuousShuffleWriter =
new UnsafeRowWriter(split.index, outputPartitioner, endpoints.toArray)
writer.write(prev.compute(split, context))

Iterator()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you dont really need a RDD here, you just need an action. You are consuming an iterator and returning nothing... that exactly like a rdd.foreachPartition. It may be so that wrapping it in this RDD is cleaner in the bigger picture, but I am unable to judge without having the bigger picture in mind (bigger picture = how are these Continuous*RDDs going to be create by SQL SparkPlan, and executed).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly just did this to mirror ContinuousWriteRDD, which itself mirrored WriteToDataSourceV2Exec returning an empty RDD. We can take it out of the current PR - it's not being used anywhere yet, and I agree that where it ends up being used will determine the right interface.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,11 @@

package org.apache.spark.sql.execution.streaming.continuous.shuffle

import org.apache.spark.Partitioner
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.catalyst.expressions.UnsafeRow

class ContinuousShuffleWriter(
writerId: Int,
outputPartitioner: Partitioner,
endpoints: Seq[RpcEndpointRef]) {

if (outputPartitioner.numPartitions != 1) {
throw new IllegalArgumentException("multiple readers not yet supported")
}

if (outputPartitioner.numPartitions != endpoints.size) {
throw new IllegalArgumentException(s"partitioner size ${outputPartitioner.numPartitions} did " +
s"not match endpoint count ${endpoints.size}")
}

def write(epoch: Iterator[UnsafeRow]): Unit = {
while (epoch.hasNext) {
val row = epoch.next()
endpoints(outputPartitioner.getPartition(row)).ask[Unit](ReceiverRow(writerId, row))
}

endpoints.foreach(_.ask[Unit](ReceiverEpochMarker(writerId)))
}
/**
* Trait for writing to a continuous processing shuffle.
*/
trait ContinuousShuffleWriter {
def write(epoch: Iterator[UnsafeRow]): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think its the right interface. The ContinuousShuffleWriter interface should be for writing the shuffled rows. The implementation should not be responsible for actually deciding partitions (i.e. outputPartitioner.getPartition(row)), as you dont want to re-implement the partitioning in every implementation. So I think the interface should be def write(row: UnsafeRow, partitionId: Int)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better encapsulation to re-implement the partitioning in every ContinuousShuffleWriter implementation than to re-implement it in every ContinuousShuffleWriter user. (Note that the non-continuous ShuffleWriter has precedent for this: it uses the same interface, and all implementations of ShuffleWriter do re-implement partitioning.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That's fair.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.continuous.shuffle

import org.apache.spark.Partitioner
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.catalyst.expressions.UnsafeRow

/**
* A [[ContinuousShuffleWriter]] sending data to [[UnsafeRowReceiver]] instances.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thought, not something that needs to be done now. But it might be overall cleaner if the ContinuousShuffleWriter and Reader are coupled together in a joint interface. This is because each writer implementation is always tied to a specific reader implementation, so they are always coupled together. Consider something like this.

trait ContinuousShuffleManager {
  def createWriter(writerId: Int, numReaders: Int): ContinuousShuffleWriter 
  def createReader(readerId: Int, numWriters: Int): ContinuousShuffleReader
}

I am just guessing that the params on the createX interfaces, I might be missing something. But I feel that a small set of params should be sufficient for any implementation figure out everything else. Also, other management/control layer stuff will go into the manager implementation. Like, for example, if the writers and readers need to exchange initial setup information (e.g. RPC endpoint details) through the driver, then the implementation of that would go into the manager.

Think about it as your building out rest of the architecture.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

*
* @param writerId The partition ID of this writer.
* @param outputPartitioner The partitioner on the reader side of the shuffle.
* @param endpoints The [[UnsafeRowReceiver]] endpoints to write to. Indexed by partition ID within
* outputPartitioner.
*/
class UnsafeRowWriter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this PR and prev PRs, I think the names UnsafeRowWriter and UnsafeRowReader are not right. The basic interfaces ContinuousShuffleReader/Writer take UnsafeRows, hence that's not unique to this implementation (that is, all implementation of these interfaces will read/write UnsafeRows). What's unique is that this implementation uses RPC mechanism to read/write. So it may be more accurate to name them RPCContinuousShuffleReader/Writer, or something like that.

writerId: Int,
outputPartitioner: Partitioner,
endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter {

if (outputPartitioner.numPartitions != 1) {
throw new IllegalArgumentException("multiple readers not yet supported")
}

if (outputPartitioner.numPartitions != endpoints.length) {
throw new IllegalArgumentException(s"partitioner size ${outputPartitioner.numPartitions} did " +
s"not match endpoint count ${endpoints.length}")
}

def write(epoch: Iterator[UnsafeRow]): Unit = {
while (epoch.hasNext) {
val row = epoch.next()
endpoints(outputPartitioner.getPartition(row)).ask[Unit](ReceiverRow(writerId, row))
}

endpoints.foreach(_.ask[Unit](ReceiverEpochMarker(writerId)))
}
}