-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27463][PYTHON] Support Dataframe Cogroup via Pandas UDFs #24981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
2e0b308
64ff5ac
6d039e3
d8a5c5d
73188f6
690fa14
c86b2bf
e3b66ac
8007fa6
d4cf6d0
87aeb92
e7528d0
b85ec75
6a8ecff
d2da787
7321141
6bbe31c
b444ff7
94be574
9241639
3de551f
300b53a
7d161ba
d1a6366
a201161
3e4bc95
307e664
7558b8d
19360c4
28493b4
d6d11e4
a62a1e3
ec78284
1a9ff58
c0d2919
4cd5c70
e025375
dd1ffaf
733b592
51dcbdc
1b966fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -356,33 +356,6 @@ def __repr__(self): | |
| return "ArrowStreamPandasSerializer" | ||
|
|
||
|
|
||
| class InterleavedArrowReader(object): | ||
|
|
||
| def __init__(self, stream): | ||
| self._stream = stream | ||
|
|
||
| def __iter__(self): | ||
| return self | ||
|
|
||
| def __next__(self): | ||
| dataframes_in_group = read_int(self._stream) | ||
| if dataframes_in_group == 2: | ||
| return self._read_df(), self._read_df() | ||
| elif dataframes_in_group == 0: | ||
| raise StopIteration | ||
| else: | ||
| raise ValueError( | ||
| 'Received Invalid number of dataframes in group {0}'.format(dataframes_in_group)) | ||
|
|
||
| def next(self): | ||
| return self.__next__() | ||
|
|
||
| def _read_df(self): | ||
| import pyarrow as pa | ||
| reader = pa.ipc.open_stream(self._stream) | ||
| return [b for b in reader] | ||
|
|
||
|
|
||
| class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): | ||
| """ | ||
| Serializer used by Python worker to evaluate Pandas UDFs | ||
|
|
@@ -428,21 +401,31 @@ def __repr__(self): | |
| return "ArrowStreamPandasUDFSerializer" | ||
|
|
||
|
|
||
| class PandasCogroupSerializer(ArrowStreamPandasUDFSerializer): | ||
| class CogroupUDFSerializer(ArrowStreamPandasUDFSerializer): | ||
|
|
||
| def __init__(self, timezone, safecheck, assign_cols_by_name): | ||
| super(PandasCogroupSerializer, self).__init__(timezone, safecheck, assign_cols_by_name) | ||
| super(CogroupUDFSerializer, self).__init__(timezone, safecheck, assign_cols_by_name) | ||
|
|
||
| def load_stream(self, stream): | ||
| """ | ||
| Deserialize Cogrouped ArrowRecordBatches to a tuple of Arrow tables and return as a two | ||
|
||
| lists of pandas.Series. | ||
| """ | ||
| reader = InterleavedArrowReader(stream) | ||
| for batch1, batch2 in reader: | ||
| import pyarrow as pa | ||
| yield ([self.arrow_to_pandas(c) for c in pa.Table.from_batches(batch1).itercolumns()], | ||
| [self.arrow_to_pandas(c) for c in pa.Table.from_batches(batch2).itercolumns()]) | ||
| import pyarrow as pa | ||
| dataframes_in_group = None | ||
|
|
||
| while dataframes_in_group is None or dataframes_in_group > 0: | ||
| dataframes_in_group = read_int(stream) | ||
|
|
||
| if dataframes_in_group == 2: | ||
| batch1 = [batch for batch in ArrowStreamSerializer.load_stream(self, stream)] | ||
| batch2 = [batch for batch in ArrowStreamSerializer.load_stream(self, stream)] | ||
| yield ([self.arrow_to_pandas(c) for c in pa.Table.from_batches(batch1).itercolumns()], | ||
| [self.arrow_to_pandas(c) for c in pa.Table.from_batches(batch2).itercolumns()]) | ||
|
|
||
| elif dataframes_in_group != 0: | ||
| raise ValueError( | ||
| 'Received Invalid number of dataframes in group {0}'.format(dataframes_in_group)) | ||
|
|
||
|
|
||
| class BatchedSerializer(Serializer): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,7 +39,7 @@ | |
| from pyspark.rdd import PythonEvalType | ||
| from pyspark.serializers import write_with_length, write_int, read_long, read_bool, \ | ||
| write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \ | ||
| BatchedSerializer, ArrowStreamPandasUDFSerializer, PandasCogroupSerializer | ||
| BatchedSerializer, ArrowStreamPandasUDFSerializer, CogroupUDFSerializer | ||
| from pyspark.sql.types import to_arrow_type, StructType | ||
| from pyspark.util import _get_argspec, fail_on_stopiteration | ||
| from pyspark import shuffle | ||
|
|
@@ -314,7 +314,7 @@ def read_udfs(pickleSer, infile, eval_type): | |
| # Scalar Pandas UDF handles struct type arguments as pandas DataFrames instead of | ||
| # pandas Series. See SPARK-27240. | ||
|
||
| if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: | ||
| ser = PandasCogroupSerializer(timezone, safecheck, assign_cols_by_name) | ||
| ser = CogroupUDFSerializer(timezone, safecheck, assign_cols_by_name) | ||
| else: | ||
| df_for_struct = (eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF or | ||
| eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF or | ||
|
|
@@ -418,8 +418,8 @@ def extract_key_value_indexes(): | |
| pickleSer, infile, eval_type, runner_conf, udf_index=0) | ||
| udfs['f'] = udf | ||
| parsed_offsets = extract_key_value_indexes() | ||
| keys = ["a[%d]" % o for o in parsed_offsets[0][0]] | ||
| vals = ["a[%d]" % o for o in parsed_offsets[0][1]] | ||
| keys = ["a[%d]" % (o,) for o in parsed_offsets[0][0]] | ||
| vals = ["a[%d]" % (o, ) for o in parsed_offsets[0][1]] | ||
| mapper_str = "lambda a: f([%s], [%s])" % (", ".join(keys), ", ".join(vals)) | ||
| elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: | ||
| # We assume there is only one UDF here because cogrouped map doesn't | ||
|
|
@@ -429,10 +429,10 @@ def extract_key_value_indexes(): | |
| pickleSer, infile, eval_type, runner_conf, udf_index=0) | ||
| udfs['f'] = udf | ||
| parsed_offsets = extract_key_value_indexes() | ||
| df1_keys = ["a[0][%d]" % o for o in parsed_offsets[0][0]] | ||
| df1_vals = ["a[0][%d]" % o for o in parsed_offsets[0][1]] | ||
| df2_keys = ["a[1][%d]" % o for o in parsed_offsets[1][0]] | ||
| df2_vals = ["a[1][%d]" % o for o in parsed_offsets[1][1]] | ||
| df1_keys = ["a[0][%d]" % (o, ) for o in parsed_offsets[0][0]] | ||
| df1_vals = ["a[0][%d]" % (o, )for o in parsed_offsets[0][1]] | ||
|
||
| df2_keys = ["a[1][%d]" % (o, ) for o in parsed_offsets[1][0]] | ||
| df2_vals = ["a[1][%d]" % (o, ) for o in parsed_offsets[1][1]] | ||
| mapper_str = "lambda a: f([%s], [%s], [%s], [%s])" % ( | ||
| ", ".join(df1_keys), ", ".join(df1_vals), ", ".join(df2_keys), ", ".join(df2_vals)) | ||
| else: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems like this only calls the super constructor. I don't think we need this.