From ea5d2ae401fee5a604a5ccfa473d8dfda4739e04 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 28 Jun 2024 12:36:19 +0900 Subject: [PATCH] Add eventually for terminated event assert --- .../sql/tests/connect/streaming/test_parity_listener.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py b/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py index a785fe47f9a1..14edfa4003b2 100644 --- a/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py +++ b/python/pyspark/sql/tests/connect/streaming/test_parity_listener.py @@ -23,6 +23,7 @@ from pyspark.sql.streaming.listener import StreamingQueryListener from pyspark.sql.functions import count, lit from pyspark.testing.connectutils import ReusedConnectTestCase +from pyspark.testing.utils import eventually # Listeners that has spark commands in callback handler functions @@ -160,8 +161,12 @@ def test_slow_query(self): self.assertTrue(slow_query.id in [str(e.progress.id) for e in listener.progress]) self.assertTrue(fast_query.id in [str(e.progress.id) for e in listener.progress]) - self.assertTrue(slow_query.id in [str(e.id) for e in listener.terminated]) - self.assertTrue(fast_query.id in [str(e.id) for e in listener.terminated]) + eventually(timeout=20, catch_assertions=True)( + lambda: self.assertTrue(slow_query.id in [str(e.id) for e in listener.terminated]) + )() + eventually(timeout=20, catch_assertions=True)( + lambda: self.assertTrue(fast_query.id in [str(e.id) for e in listener.terminated]) + )() finally: for listener in self.spark.streams._sqlb._listener_bus: