Skip to content
Prev Previous commit
Next Next commit
expanded test for more cases
  • Loading branch information
BryanCutler committed Oct 30, 2018
commit 0d77b0051546eb3a178a06a95f35ccb20ddeeff6
24 changes: 20 additions & 4 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4435,10 +4435,26 @@ def test_timestamp_dst(self):
self.assertPandasEqual(pdf, df_from_pandas.toPandas())

def test_toPandas_batch_order(self):
df = self.spark.range(64, numPartitions=8).toDF("a")
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 4}):
pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
self.assertPandasEqual(pdf, pdf_arrow)

# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python
def run_test(num_records, num_parts, max_records):
df = self.spark.range(num_records, numPartitions=num_parts).toDF("a")
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}):
pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
self.assertPandasEqual(pdf, pdf_arrow)

cases = [
(1024, 512, 2), # Try large num partitions for good chance of not collecting in order
(512, 64, 2), # Try medium num partitions to test out of order collection
(64, 8, 2), # Try small number of partitions to test out of order collection
(64, 64, 1), # Test single batch per partition
(64, 1, 64), # Test single partition, single batch
(64, 1, 8), # Test single partition, multiple batches
(30, 7, 2), # Test different sized partitions
]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk and @felixcheung , I didn't do a loop but chose some different levels of partition numbers to be a bit more sure that partitions won't end up in order. I also added some other cases of different partition/batch ratios. Let me know if you think we need more to be sure here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how we're guaranteeing out-of-order from the JVM. Could we delay on one of the early partitions to guarantee out of order?

Copy link
Member Author

@BryanCutler BryanCutler Nov 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's not a guarantee, but with a large num of partitions, it's a pretty slim chance they will all be in order. I can also add a case with some delay. My only concern is how big to make the delay to be sure it's enough without adding wasted time to the tests.

How about we keep the case with a large number of partitions and add a case with 100ms delay on the first partition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk , I updated the tests, please take another look when you get a chance. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new tests, I think 0.1 on one of partitions is enough.


for case in cases:
run_test(num_records=case[0], num_parts=case[1], max_records=case[2])


@unittest.skipIf(
Expand Down