-
Notifications
You must be signed in to change notification settings - Fork 33
Spark Structured Streaming checkpointing integration test #459
Spark Structured Streaming checkpointing integration test #459
Conversation
48dc64d to
9589ab1
Compare
elezar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @akirillov.
I have made some comments regarding the Python testing infrastructure here. It's a good cleanup, so these are generally minor. I didn't spend too much time reviewing the Spark code specifically.
tests/fixtures/__init__.py
Outdated
| this_file_dir = os.path.dirname(os.path.abspath(__file__)) | ||
| sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'tests'))) | ||
| sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'testing'))) | ||
| sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'spark-testing'))) No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Newline:
| sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'spark-testing'))) | |
| sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'spark-testing'))) | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
tests/fixtures/hdfs.py
Outdated
| additional_options=service_kerberos_options, | ||
| timeout_seconds=30 * 60) | ||
|
|
||
| yield kerberos_env |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this yield kerberos_env? This fixture doesn't modify the env at all (that I can tell).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed in favour of using kerberos_env throughout the dependent tests and fixtures.
tests/fixtures/hdfs.py
Outdated
|
|
||
| finally: | ||
| sdk_install.uninstall(HDFS_PACKAGE_NAME, HDFS_SERVICE_NAME) | ||
| kerberos_env.cleanup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my opinion, this should be handled in the fixture providing kerberos_env.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleanup moved to KDC fixture
|
|
||
|
|
||
| @pytest.fixture(scope='session') | ||
| def setup_hdfs_client(hdfs_with_kerberos): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is strange that this depends on hdfs_with_kerberos without accessing it. Should the client depend on the service in some way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main intent of this dependency is to introduce the ordering in which hdfsclient never created before hdfs service. What would be a better way of implementing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realise that it's about ordering, so that's fine, but the dependence is not really explicit here. It takes knowledge of the client itself to know that it requires an HDFS service with a name hdfs.
What I have used in the past is something like: https://github.com/mesosphere/dcos-commons/blob/d83139ac0fb9c53719a9e5637a5704ae9dfe23d5/frameworks/hdfs/tests/test_kerberos_auth.py#L101
where the hdfs_with_kerberos is a dict which contains the required configuration and this si explicitly passed to the client. It's not a requirement for this PR though.
tests/fixtures/hdfs.py
Outdated
| "enabled": True, | ||
| "realm": sdk_auth.REALM, | ||
| "kdc": { | ||
| "hostname": hdfs_with_kerberos.get_host(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should explicitly use kerberos_env here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switched to kerberos_env
tests/fixtures/kafka.py
Outdated
| finally: | ||
| sdk_install.uninstall(KAFKA_PACKAGE_NAME, KAFKA_SERVICE_NAME) | ||
| kerberos_env.cleanup() | ||
| print('noop') No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| print('noop') | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this wasn't intended to be committed, removed
tests/fixtures/kafka.py
Outdated
|
|
||
| finally: | ||
| sdk_install.uninstall(KAFKA_PACKAGE_NAME, KAFKA_SERVICE_NAME) | ||
| kerberos_env.cleanup() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As is the case with HDFS, we should not cleanup the Kerberos environment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleanup moved to KDC fixture
tests/fixtures/kdc.py
Outdated
| yield kerberos_env | ||
|
|
||
| finally: | ||
| print('noop') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| print('noop') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this wasn't intended to be committed, removed
tests/test_checkpointing.py
Outdated
| setup_hdfs_paths() | ||
|
|
||
| # running kafka producer | ||
| messages = ["test"] * 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to make these messages unique for a specific run?:
["test0", "test1", ... ]?
| messages = ["test"] * 100 | |
| messages = ["test%d" % d for d in range(100)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! The new implementation uses two different word sets to verify both the count accuracy and aggregated data checkpointing.
tests/test_soak.py
Outdated
|
|
||
| import spark_utils as utils | ||
| from tests.test_kafka import KAFKA_PACKAGE_NAME, test_pipeline | ||
| # from tests.test_kafka import KAFKA_PACKAGE_NAME, test_pipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rather remove this than comment it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was commented by mistake, reverted back now.
751ebf4 to
9c26c8c
Compare
|
thanks for your comments @elezar. I cleaned up the code according to your suggestions and left a comment related to a wider refactoring. The Structured Streaming test itself is modified now to avoid false positives by using different word sets fed into Kafka before and after driver failure. |
ea2d212 to
1bcbe0b
Compare
elezar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @akirillov.
I think the remaining points I had were more specific to any follow-up refactoring work that may be done.
samvantran
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good and I appreciate the cleanup/fixtures. Just a couple of comments/questions my side.
manifest.json
Outdated
| "default_spark_dist": { | ||
| "hadoop_version": "2.7", | ||
| "uri": "https://downloads.mesosphere.com/spark/assets/spark-2.2.1-4-SNAPSHOT-bin-2.7.tgz" | ||
| "uri": "https://downloads.mesosphere.com/spark/assets/spark-2.3.2-1-SNAPSHOT-bin-2.7.3.tgz" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the appended -1 in 2.3.1-1 from upstream spark tar creation or our own? If it's from us, we can probably do away with appending -1 to the actual version as SNAPSHOT is enough to indicate dev status.
So downloads.mesosphere.com/.../spark-2.3.2-SNAPSHOT-bin.... If it's from upstream, pls disregard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jenkins upload job is failing if an object with the same key exists, this complicated SNAPSHOT replacement. Overall, I think we should still keep some sort of versioning to avoid failures when a snapshot is overridden with some broken/incompatible changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is the case then do we need the word SNAPSHOT at all? We can go with a simpler versioning system like 2.3.2.1 or 2.3.2-1, if we make it known the last digit represents a dev build. Or perhaps a proper thing to do is tag the file w/ a date-time
manifest.json
Outdated
| { | ||
| "hadoop_version": "2.7", | ||
| "uri": "https://downloads.mesosphere.com/spark/assets/spark-2.2.1-4-SNAPSHOT-bin-2.7.tgz" | ||
| "uri": "https://downloads.mesosphere.com/spark/assets/spark-2.3.2-1-SNAPSHOT-bin-2.7.3.tgz" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we plan to use hadoop 2.7.7?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a separate ticket for porting CVEs commit from branch 2.2 to 2.3, in the current branch Hadoop is of version 2.7.3. It makes sense to (at least) update Hadoop dependency first in 2.3 branch and use the updated tar here to avoid having an outdated master in spark-build
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay let's port the CVE updates first and rebase this
| }, | ||
| "kafka": { | ||
| "default_replication_factor": 3, | ||
| "num_partitions": 32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what solved the hanging kafka consumers in the tests, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this unstuck Spark Structured Streaming consumer. spark_kafka test has stabilized too but it looks like the stuck consumption issue is still present there (long freeze before starting actual consumption).
| @@ -1,4 +1,4 @@ | |||
| lazy val SparkVersion = sys.env.get("SPARK_VERSION").getOrElse("2.2.0") | |||
| lazy val SparkVersion = sys.env.getOrElse("SPARK_VERSION", "2.3.2") | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not this file but I think you accidentally added tests/jobs/scala/.DS_Store. Please remove (you can also add DS_Store to .gitignore to guard against in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for this catch! will add to gitignore and cleanup the PR from it
| } catch { | ||
| case t: Throwable => | ||
| t.printStackTrace() | ||
| Thread.sleep(30*60*1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why sleep for 30 minutes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added to ease Kafka producer troubleshooting in case of errors (happens only when an exception is thrown). After verifying functionality it looks like it's not needed anymore.
tests/test_checkpointing.py
Outdated
|
|
||
| # Wait until executor is running | ||
| LOGGER.info("Starting supervised driver {}".format(driver_task_id)) | ||
| sdk_tasks.check_running(SPARK_APPLICATION_NAME, 1, timeout_seconds=600) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add expected_task_count=1 otherwise the lone 1 is a bit mysterious or use a named_constant because I see something similar on L117.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. fixed that.
|
|
||
| from tests.fixtures.hdfs import HDFS_SERVICE_NAME, HDFS_DATA_DIR, HDFS_HISTORY_DIR | ||
| from tests.fixtures.hdfs import HISTORY_PACKAGE_NAME | ||
| from tests.fixtures.hdfs import SPARK_SUBMIT_HDFS_KERBEROS_ARGS, KEYTAB_SECRET_PATH, GENERIC_HDFS_USER_PRINCIPAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: need 2 line breaks b/w imports and code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
| @pytest.mark.skipif(not utils.hdfs_enabled(), reason='HDFS_ENABLED is false') | ||
| @pytest.mark.sanity | ||
| def test_history(): | ||
| def test_history(kerberized_spark, hdfs_with_kerberos, setup_history_server): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you're depending onkerberized_spark (which internally depends on setup_history_sever, hdfs_with_kerberos), do you need to call those two again here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main intention here is to explicitly list the dependencies of the test to ease troubleshooting. It's not only in this test, but a little bit of redundancy also provides a better view of the context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay that's fair
| kerberos_args = get_kerberized_kafka_spark_conf(spark_service_name) | ||
|
|
||
| producer_config = ["--conf", "spark.cores.max=2", "--conf", "spark.executor.cores=2", | ||
| producer_config = ["--conf", "spark.cores.max=2", "--conf", "spark.executor.cores=1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will mean that we spawn 2 producers, is this intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the main goal here is to lower down a single executor resource demands while keeping parallelism at the same level.
| sdk_tasks.check_running(KAFKA_SERVICE_NAME, 1, timeout_seconds=600) | ||
|
|
||
| consumer_config = ["--conf", "spark.cores.max=4", "--class", "KafkaConsumer"] + common_args | ||
| consumer_config = ["--conf", "spark.cores.max=2", "--conf", "spark.executor.cores=1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 producers, 2 consumers - okay, looks like it is intended
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is the same here - fixed-size executors preserving specified parallelism level.
e2e1ebf to
1889404
Compare
samvantran
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving since tests pass and my comments are non-blockers (other than updating the tgz to include the latest 2.3 CVE fixes)
1889404 to
a09f417
Compare
a09f417 to
aa1f35e
Compare
What changes were proposed in this pull request?
Resolves DCOS-41580
The new integration test launches Spark Structured Streaming job with checkpointing backed by HDFS, feeds some test data to Kafka and verifies that the aggregated data is not reprocessed but recovered from a checkpoint instead if a failure happens. All components are Kerberized.
Apart from the test this PR introduces a refactoring for KDC/HDFS/Kafka-dependent tests and allows reuse of these components across the tests without reinstalling them every time.
Changes:
How were these changes tested?
Integration tests from this repo
Release Notes