Skip to content

Commit

Permalink
add test case with delay
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanCutler committed Nov 6, 2018
1 parent 6457e42 commit bf2feec
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4925,25 +4925,31 @@ def test_timestamp_dst(self):

def test_toPandas_batch_order(self):

def delay_first_part(partition_index, iterator):
if partition_index == 0:
time.sleep(0.1)
return iterator

# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python
def run_test(num_records, num_parts, max_records):
def run_test(num_records, num_parts, max_records, use_delay=False):
df = self.spark.range(num_records, numPartitions=num_parts).toDF("a")
if use_delay:
df = df.rdd.mapPartitionsWithIndex(delay_first_part).toDF()
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}):
pdf, pdf_arrow = self._toPandas_arrow_toggle(df)
self.assertPandasEqual(pdf, pdf_arrow)

cases = [
(1024, 512, 2), # Try large num partitions for good chance of not collecting in order
(512, 64, 2), # Try medium num partitions to test out of order collection
(64, 8, 2), # Try small number of partitions to test out of order collection
(64, 64, 1), # Test single batch per partition
(64, 1, 64), # Test single partition, single batch
(64, 1, 8), # Test single partition, multiple batches
(30, 7, 2), # Test different sized partitions
(1024, 512, 2), # Use large num partitions for more likely collecting out of order
(64, 8, 2, True), # Use delay in first partition to force collecting out of order
(64, 64, 1), # Test single batch per partition
(64, 1, 64), # Test single partition, single batch
(64, 1, 8), # Test single partition, multiple batches
(30, 7, 2), # Test different sized partitions
]

for case in cases:
run_test(num_records=case[0], num_parts=case[1], max_records=case[2])
run_test(*case)


class EncryptionArrowTests(ArrowTests):
Expand Down

0 comments on commit bf2feec

Please sign in to comment.