From 5a44a5ddbf9b87477bb61dfbc164dfa5884cdba3 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 25 Feb 2019 15:59:15 -0800 Subject: [PATCH 1/4] introducing new shuffle writerAPI --- .../spark/api/shuffle/ShuffleDataIO.java | 23 +++++++++++++++ .../shuffle/ShuffleExecutorComponents.java | 25 ++++++++++++++++ .../api/shuffle/ShuffleMapOutputWriter.java | 29 +++++++++++++++++++ .../api/shuffle/ShufflePartitionWriter.java | 28 ++++++++++++++++++ .../api/shuffle/ShuffleWriteSupport.java | 29 +++++++++++++++++++ 5 files changed, 134 insertions(+) create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java new file mode 100644 index 000000000000..46f1f1c12209 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +public interface ShuffleDataIO { + + ShuffleExecutorComponents executor(); +} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java new file mode 100644 index 000000000000..4fadcc23e933 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +public interface ShuffleExecutorComponents { + + void intitializeExecutor(String appId, String execId); + + ShuffleWriteSupport writes(); +} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java new file mode 100644 index 000000000000..39e2c42bf258 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +import java.io.IOException; + +public interface ShuffleMapOutputWriter { + + ShufflePartitionWriter getNextPartitionWriter() throws IOException; + + void commitAllPartitions() throws IOException; + + void abort(Throwable error) throws IOException; +} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java new file mode 100644 index 000000000000..af862ce58d54 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +import java.io.IOException; +import java.io.OutputStream; + +public interface ShufflePartitionWriter { + + OutputStream openStream() throws IOException; + + long getLength(); +} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java new file mode 100644 index 000000000000..b8a11f564b40 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +import java.io.IOException; + +public interface ShuffleWriteSupport { + + ShuffleMapOutputWriter createMapOutputWriter( + String appId, + int shuffleId, + int mapId, + int numPartitions) throws IOException; +} From 40fb1c3cafb37e511cc07100c9d4205eb3e3ac2c Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Tue, 26 Feb 2019 16:02:00 -0800 Subject: [PATCH 2/4] adding openChannel() to default --- .../apache/spark/api/shuffle/ShuffleExecutorComponents.java | 4 ++-- .../apache/spark/api/shuffle/ShufflePartitionWriter.java | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java index 4fadcc23e933..7efa5602f085 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java @@ -19,7 +19,7 @@ public interface ShuffleExecutorComponents { - void intitializeExecutor(String appId, String execId); + void intitializeExecutor(String appId, String execId); - ShuffleWriteSupport writes(); + ShuffleWriteSupport writes(); } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java index af862ce58d54..10a8573a9f52 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java @@ -19,10 +19,16 @@ import java.io.IOException; import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; public interface ShufflePartitionWriter { OutputStream openStream() throws IOException; long getLength(); + + default WritableByteChannel openChannel() throws IOException { + return Channels.newChannel(openStream()); + } } From 8f3e6e6dd04fcadc0bdcfb8256593e38b895e4e9 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Wed, 13 Mar 2019 16:33:02 -0700 Subject: [PATCH 3/4] added experimental --- .../org/apache/spark/api/shuffle/ShuffleDataIO.java | 10 +++++++++- .../spark/api/shuffle/ShuffleExecutorComponents.java | 9 +++++++++ .../spark/api/shuffle/ShuffleMapOutputWriter.java | 9 +++++++++ .../spark/api/shuffle/ShufflePartitionWriter.java | 9 +++++++++ .../apache/spark/api/shuffle/ShuffleWriteSupport.java | 9 +++++++++ 5 files changed, 45 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java index 46f1f1c12209..25bf0ce4394b 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java @@ -17,7 +17,15 @@ package org.apache.spark.api.shuffle; -public interface ShuffleDataIO { +import org.apache.spark.annotation.Experimental; +/** + * :: Experimental :: + * An interface for launching Shuffle related components + * + * @since 3.0.0 + */ +@Experimental +public interface ShuffleDataIO { ShuffleExecutorComponents executor(); } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java index 7efa5602f085..0d0d96ebcd55 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java @@ -17,6 +17,15 @@ package org.apache.spark.api.shuffle; +import org.apache.spark.annotation.Experimental; + +/** + * :: Experimental :: + * An interface for building shuffle support for Executors + * + * @since 3.0.0 + */ +@Experimental public interface ShuffleExecutorComponents { void intitializeExecutor(String appId, String execId); diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java index 39e2c42bf258..fc0472609fa6 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java @@ -19,6 +19,15 @@ import java.io.IOException; +import org.apache.spark.annotation.Experimental; + +/** + * :: Experimental :: + * An interface for creating and managing shuffle partition writers + * + * @since 3.0.0 + */ +@Experimental public interface ShuffleMapOutputWriter { ShufflePartitionWriter getNextPartitionWriter() throws IOException; diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java index 10a8573a9f52..72d20f4dd746 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java @@ -22,6 +22,15 @@ import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import org.apache.http.annotation.Experimental; + +/** + * :: Experimental :: + * An interface for giving streams / channels for shuffle writes + * + * @since 3.0.0 + */ +@Experimental public interface ShufflePartitionWriter { OutputStream openStream() throws IOException; diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java index b8a11f564b40..f50d829a0a8b 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java @@ -19,6 +19,15 @@ import java.io.IOException; +import org.apache.http.annotation.Experimental; + +/** + * :: Experimental :: + * An interface for deploying a shuffle map output writer + * + * @since 3.0.0 + */ +@Experimental public interface ShuffleWriteSupport { ShuffleMapOutputWriter createMapOutputWriter( From b3223393510bce49b1c987e870a4ed19ccec6834 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Tue, 19 Mar 2019 11:37:20 -0700 Subject: [PATCH 4/4] modified spacing --- .../org/apache/spark/api/shuffle/ShuffleDataIO.java | 2 +- .../spark/api/shuffle/ShuffleExecutorComponents.java | 5 ++--- .../spark/api/shuffle/ShuffleMapOutputWriter.java | 7 +++---- .../spark/api/shuffle/ShufflePartitionWriter.java | 11 +++++------ .../apache/spark/api/shuffle/ShuffleWriteSupport.java | 11 +++++------ 5 files changed, 16 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java index 25bf0ce4394b..4cb40f6dd00b 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDataIO.java @@ -27,5 +27,5 @@ */ @Experimental public interface ShuffleDataIO { - ShuffleExecutorComponents executor(); + ShuffleExecutorComponents executor(); } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java index 0d0d96ebcd55..1edf044225cc 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleExecutorComponents.java @@ -27,8 +27,7 @@ */ @Experimental public interface ShuffleExecutorComponents { + void intitializeExecutor(String appId, String execId); - void intitializeExecutor(String appId, String execId); - - ShuffleWriteSupport writes(); + ShuffleWriteSupport writes(); } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java index fc0472609fa6..5119e34803a8 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java @@ -29,10 +29,9 @@ */ @Experimental public interface ShuffleMapOutputWriter { + ShufflePartitionWriter getNextPartitionWriter() throws IOException; - ShufflePartitionWriter getNextPartitionWriter() throws IOException; + void commitAllPartitions() throws IOException; - void commitAllPartitions() throws IOException; - - void abort(Throwable error) throws IOException; + void abort(Throwable error) throws IOException; } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java index 72d20f4dd746..c043a6b3a499 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShufflePartitionWriter.java @@ -32,12 +32,11 @@ */ @Experimental public interface ShufflePartitionWriter { + OutputStream openStream() throws IOException; - OutputStream openStream() throws IOException; + long getLength(); - long getLength(); - - default WritableByteChannel openChannel() throws IOException { - return Channels.newChannel(openStream()); - } + default WritableByteChannel openChannel() throws IOException { + return Channels.newChannel(openStream()); + } } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java index f50d829a0a8b..5ba5564bb46d 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java @@ -29,10 +29,9 @@ */ @Experimental public interface ShuffleWriteSupport { - - ShuffleMapOutputWriter createMapOutputWriter( - String appId, - int shuffleId, - int mapId, - int numPartitions) throws IOException; + ShuffleMapOutputWriter createMapOutputWriter( + String appId, + int shuffleId, + int mapId, + int numPartitions) throws IOException; }