Skip to content

Conversation

@wbo4958
Copy link
Contributor

@wbo4958 wbo4958 commented Feb 23, 2024

What changes were proposed in this pull request?

Support stage-level scheduling for PySpark connect DataFrame APIs (mapInPandas and mapInArrow).

Why are the changes needed?

#44852 has supported ResourceProfile in mapInPandas/mapInArrow for SQL, So it's the right time to enable it for connect.

Does this PR introduce any user-facing change?

Yes, Users can pass ResourceProfile to mapInPandas/mapInArrow through the connect pyspark client.

How was this patch tested?

Pass the CIs and manual tests.

Was this patch authored or co-authored using generative AI tooling?

No

@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 26, 2024

Manual tests

The manual tests were conducted on a spark Standalone cluster with only 1 worker which has 6 cpu cores.

With dynamic allocation disabled.

start-connect-server.sh --master spark://192.168.0.106:7077 \
   --jars jars/spark-connect_2.13-4.0.0-SNAPSHOT.jar  \
	 --conf spark.executor.cores=4 \
	 --conf spark.task.cpus=1 \
	 --conf spark.dynamicAllocation.enabled=false

The above command starts the connect server and it requires 1 executor with 4 CPU cores, and the default task.cpus = 1, so the default tasks parallelism is 4 at a time.

And then launch the spark connect pyspark client by

pyspark --remote "sc://localhost"
  1. task.cores=1

Test code:

from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(1)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

When the required task.cpus=1, executor.cores=4 (No executor resource specified, use the default one), there will be 4 tasks running for rp at the same time.

The entire Spark application consists of a single Spark job that will be divided into two stages. The first shuffle stage comprises 6 tasks, the first 4 tasks will be executed simultaneously, then the last 2 tasks.

1

The second ResultStage comprises 3 tasks, all of which will be executed simultaneously since the required task.cpus is 1.

2

  1. task.cores=2

Test code,

from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(2)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

When the required task.cpus=2, executor.cores=4 (No executor resource specified, use the default one), there will be 2 tasks running for rp.

The first shuffle stage behaves the same as the first one.

The second ResultStage comprises 3 tasks, so the first 2 tasks will be running at a time, and then execute the last task.

3

  1. task.cores=3

Test code,

from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(3)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

When the required task.cpus=3, executor.cores=4 (No executor resource specified, use the default one), there will be 1 task running for rp.

The first shuffle stage behaves the same as the first one.

The second ResultStage comprises 3 tasks, all of which will be running serially.

4

  1. task.cores=5
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(5)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

exception happened.

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/dataframe.py", line 1763, in collect
    table, schema = self._to_table()
  File "/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/dataframe.py", line 1774, in _to_table
    query = self._plan.to_proto(self._session.client)
  File "/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/plan.py", line 127, in to_proto
    plan.root.CopyFrom(self.plan(session))
  File "/home/xxx/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/plan.py", line 2201, in plan
    plan.map_partitions.profile_id = self._profile.id
  File "/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/resource/profile.py", line 132, in id
    rp = _ResourceProfile(
  File "/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/resource/profile.py", line 65, in __init__
    self._id = session.client.build_resource_profile(self._remote_profile)
  File "/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/client/core.py", line 1741, in build_resource_profile
    resp = self._stub.BuildResourceProfile(req)
  File "/home/bobwang/anaconda3/envs/pyspark/lib/python3.10/site-packages/grpc/_channel.py", line 1160, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/home/bobwang/anaconda3/envs/pyspark/lib/python3.10/site-packages/grpc/_channel.py", line 1003, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "The number of cores per executor (=4) has to be >= the number of cpus per task = 5."
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"The number of cores per executor (=4) has to be >= the number of cpus per task = 5.", grpc_status:13, created_time:"2024-02-26T10:42:37.331616664+08:00"}"

@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 26, 2024

With dynamic allocation enabled.

start-connect-server.sh --master spark://192.168.0.106:7077 \
   --jars jars/spark-connect_2.13-4.0.0-SNAPSHOT.jar  \
	 --conf spark.executor.cores=4 \
	 --conf spark.task.cpus=1 \
	 --conf spark.dynamicAllocation.enabled=true \
   --conf spark.dynamicAllocation.maxExecutors=1 \

The above command enables the dynamic allocation and the max executors required is set to 1 in order to test. And then launch the spark connect pyspark client by

pyspark --remote "sc://localhost"

TaskResourceProfile without any specific executor request information

Test code,

from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 4)

treqs = TaskResourceRequests().cpus(3)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

The rp refers to the TaskResourceProfile without any specific executor request information, thus the executor information will utilize the default values from Default ResourceProfile (executor.cores=4).

The above code will require an extra executor which will have the same executor.cores/memory as the default ResourceProfile.

0

1

2

Different executor request information

from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 4)

ereqs = ExecutorResourceRequests().cores(6)
treqs = TaskResourceRequests().cpus(5)
rp = ResourceProfileBuilder().require(treqs).require(ereqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

5

6

7

@wbo4958 wbo4958 marked this pull request as ready for review February 26, 2024 03:45
@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 26, 2024

Hi @tgravescs @WeichenXu123 @zhengruifeng @Ngone51, Could you also please help review it. Thx.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 29, 2024

Hi @tgravescs, This PR changed ResourceProfile a little bit to support connect, Could you help review it? Thx very much.

}
}

message BuildResourceProfileRequest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would need @grundprinzip and @hvanhovell 's review.

@HyukjinKwon
Copy link
Member

Looks fine in general

@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 6, 2024

Hi @HyukjinKwon, Could you help review again, thx very much.

@tgravescs
Copy link
Contributor

Does this PR introduce any user-facing change?
Yes, Users can pass ResourceProfile to mapInPandas/mapInArrow through the connect pysprark client.

I think you are adding the ResourceProfile api to spark connect for anything to use, correct? I guess since Spark Connect doesn't have SparkContext support its only usable by these apis? It would be nice to have more info in the description about what you are adding and if they match the current python api's exactly?

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me. Only things I would like to make sure if the protobuf definition part is in good shape (https://github.com/apache/spark/pull/45232/files#r1510814720) which @hvanhovell and @grundprinzip has a better understanding about.

cc @zhengruifeng, @WeichenXu123 and @ueshin too in case you have some feedback.

"pyspark.resource.profile",
# unittests
"pyspark.resource.tests.test_resources",
"pyspark.resource.tests.test_connect_resources",
Copy link
Contributor

@zhengruifeng zhengruifeng Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is for spark connect, I think we should move it to Module pyspark_connect?

or maybe we can move the test cases in it to pyspark.sql.tests.connect.test_resources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pull request already includes pyspark.sql.tests.connect.test_resources to test the general mapInPandas/mapInArrow functionality with ResourceProfile. On the other hand, pyspark.resource.tests.test_connect_resources is specifically for testing special cases like creating a ResourceProfile before establishing a remote session. Therefore, it seems appropriate to keep the tests in their respective locations.

import pyspark.sql.connect.proto as pb2


class ResourceProfile:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this class user-facing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ResourceProfile is not user-facing; it is primarily used internally to create the resource profile on the server side and retrieve the associated resource profile ID.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 7, 2024

Does this PR introduce any user-facing change?
Yes, Users can pass ResourceProfile to mapInPandas/mapInArrow through the connect pysprark client.

I think you are adding the ResourceProfile api to spark connect for anything to use, correct? I guess since Spark Connect doesn't have SparkContext support its only usable by these apis? It would be nice to have more info in the description about what you are adding and if they match the current python api's exactly?

There are no APIs changed/added for ResourceProfile, this PR just changed ResourceProfile internally to support Spark Connect. Like, here here



class ResourceProfileTests(unittest.TestCase):
def test_profile_before_sc_for_connect(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any tests needed for error checking/failure cases and make sure that gets to user properly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the error checking.

@tgravescs
Copy link
Contributor

changes look fine to me

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just looked at the protocol for now:

  • why a new RPC?
  • what is the lifecycle of the resource profiles
  • why not attach the profile directly to the map operation instead of registering it? Then you can tie the lifetime to the query.

optional bool is_barrier = 3;

// (Optional) ResourceProfile id used for the stage level scheduling.
optional int32 profile_id = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these be uuids? Just to make sure that we have intentional difference and no off by one?

Copy link
Contributor Author

@wbo4958 wbo4958 Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @grundprinzip,

I'm not quite following you about "by uuids".

So the basic implementation is

  1. The client creates ResourceProfile
  2. if the profile ID of ResourceProfile is accessed for the first time, then the client will ask to create ResourceProfile and add it to the ResourceProfileManager on the server side, and the server side will return the profile ID to the client which will set the id to the ResourceProfile on the client side.
  3. The internal mapInPandas/mapInArrow will just use the ResourceProfile id, and the server side can extract the ResourceProfile from ResourceProfileManager according to the id.

optional bool is_barrier = 3;

// (Optional) ResourceProfile id used for the stage level scheduling.
optional int32 profile_id = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need an extra RPC for that? Can't you attach the resource profile directly to this call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @grundprinzip, The user still needs to know the exact ResourceProfile id, if we attach resource profile in the call, seems we can't get id in this call.

}

// Build ResourceProfile and get the profile id
rpc BuildResourceProfile(BuildResourceProfileRequest) returns (BuildResourceProfileResponse) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be an extra RPC and not just a command?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @grundprinzip, Really good suggestion, just made the newest commit to move it to the command.

val rp = transformResourceProfile(request.getProfile)

val session = holder.session
session.sparkContext.resourceProfileManager.addResourceProfile(rp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are these cleaned up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Both ResourceProfile and ResourceProfileManager don't have the cleanup. If you think we need to cleanup, we can file another PR for it.

@wbo4958 wbo4958 requested a review from grundprinzip March 11, 2024 23:28
@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 11, 2024

Hi @grundprinzip, Could you help review it again?

@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 13, 2024

Hi @grundprinzip, I would be grateful if you could kindly take another look at this PR, Thx.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 20, 2024

Hi @grundprinzip, @HyukjinKwon, @zhengruifeng, This PR has been there for a while, could you help review/merge it? Thx

ResultComplete result_complete = 14;

// Response for command that creates ResourceProfile.
CreateResourceProfileCommandResult create_resource_profile_command_result = 20;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the next ID 17 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @grundprinzip, Thx for reviewing, Changed it to 17. Could you help review it again?

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In contrast to the regular spark API this implementation doesn't manage the lifecycle of of the resource request. Can you create a follow up Jira that removes the resource request from the spark context again?

@wbo4958
Copy link
Contributor Author

wbo4958 commented Apr 1, 2024

In contrast to the regular spark API this implementation doesn't manage the lifecycle of of the resource request. Can you create a follow up Jira that removes the resource request from the spark context again?

Hi @grundprinzip. Done, created a JIRA task.

@wbo4958 wbo4958 requested a review from grundprinzip April 1, 2024 11:51
Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure that the follow work does not get lost.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Apr 1, 2024

Please make sure that the follow work does not get lost.

Sure, I will get it done.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Apr 1, 2024

Hi @HyukjinKwon, Could you help merge it? Thx

@HyukjinKwon
Copy link
Member

Merged to master.

@wbo4958 wbo4958 deleted the connect-rp branch April 2, 2024 02:07
dongjoon-hyun added a commit that referenced this pull request Apr 4, 2024
…f pandas is not available

### What changes were proposed in this pull request?

This is a follow-up of the followings to skip `pandas`-related tests if pandas is not available.
- #44852
- #45232

### Why are the changes needed?

`pandas` is an optional dependency. We had better skip it without causing failures.

To recover the PyPy 3.8 CI,
- https://github.com/apache/spark/actions/runs/8541011879/job/23421483071

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually test.
```
$ python/run-tests --modules=pyspark-resource --parallelism=1 --python-executables=python3.10
Running PySpark tests. Output is in /Users/dongjoon/APACHE/spark-merge/python/unit-tests.log
Will test against the following Python executables: ['python3.10']
Will test the following Python modules: ['pyspark-resource']
python3.10 python_implementation is CPython
python3.10 version is: Python 3.10.13
Starting test(python3.10): pyspark.resource.profile (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/021bc7bb-242f-4cb4-8584-11ed6e711f78/python3.10__pyspark.resource.profile__jn89f1hh.log)
Finished test(python3.10): pyspark.resource.profile (1s)
Starting test(python3.10): pyspark.resource.tests.test_connect_resources (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/244d6c6f-8799-4a2a-b7a7-20d7c50d643d/python3.10__pyspark.resource.tests.test_connect_resources__5ta1tf6e.log)
Finished test(python3.10): pyspark.resource.tests.test_connect_resources (0s) ... 1 tests were skipped
Starting test(python3.10): pyspark.resource.tests.test_resources (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/671e7afa-e764-443f-bc40-7e940d7342ea/python3.10__pyspark.resource.tests.test_resources__lhbp6y5f.log)
Finished test(python3.10): pyspark.resource.tests.test_resources (2s) ... 1 tests were skipped
Tests passed in 4 seconds

Skipped tests in pyspark.resource.tests.test_connect_resources with python3.10:
      test_profile_before_sc_for_connect (pyspark.resource.tests.test_connect_resources.ResourceProfileTests) ... skip (0.005s)

Skipped tests in pyspark.resource.tests.test_resources with python3.10:
      test_profile_before_sc_for_sql (pyspark.resource.tests.test_resources.ResourceProfileTests) ... skip (0.001s)
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45869 from dongjoon-hyun/SPARK-46812.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants