diff --git a/.gitignore b/.gitignore index cf16da24..e35b4c1f 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,5 @@ cluster_info.json dcos-launch-* env/ venv/ + +.DS_Store \ No newline at end of file diff --git a/Makefile b/Makefile index 7f6098b9..ad1ccc4f 100644 --- a/Makefile +++ b/Makefile @@ -115,10 +115,10 @@ stub-universe-url: docker-dist fi # Special directive to assist in speeding up CI builds -# Generates a master checksum against the source files in tests/jobs/scala/src/* +# Generates a master checksum against the source files in tests/jobs/scala # This is later compared to avoid building identical jars from previous CI runs test-jar-checksum: - find tests/jobs/scala/src/* -type f -exec md5sum '{}' + | sort > checksums + find tests/jobs/scala -type f -exec md5sum '{}' + | sort > checksums md5sum checksums | cut -d ' ' -f1 > test-jar-checksum MD5SUM ?= $(shell cat test-jar-checksum) diff --git a/manifest.json b/manifest.json index 942df0bb..15fe7211 100644 --- a/manifest.json +++ b/manifest.json @@ -1,17 +1,17 @@ { - "spark_version": "2.2.1", + "spark_version": "2.3.2", "default_spark_dist": { "hadoop_version": "2.7", - "uri": "https://downloads.mesosphere.com/spark/assets/spark-2.2.1-4-SNAPSHOT-bin-2.7.tgz" + "uri": "https://downloads.mesosphere.com/spark/assets/spark-2.3.2-SNAPSHOT-2-bin-2.7.7.tgz" }, "spark_dist": [ { "hadoop_version": "2.7", - "uri": "https://downloads.mesosphere.com/spark/assets/spark-2.2.1-4-SNAPSHOT-bin-2.7.tgz" + "uri": "https://downloads.mesosphere.com/spark/assets/spark-2.3.2-SNAPSHOT-2-bin-2.7.7.tgz" }, { "hadoop_version": "2.6", - "uri": "https://downloads.mesosphere.com/spark/assets/spark-2.2.1-4-SNAPSHOT-bin-2.6.tgz" + "uri": "https://downloads.mesosphere.com/spark/assets/spark-2.3.2-SNAPSHOT-2-bin-2.6.5.tgz" } ] -} +} \ No newline at end of file diff --git a/spark-testing/spark_utils.py b/spark-testing/spark_utils.py index 7312fc11..3f8b4458 100644 --- a/spark-testing/spark_utils.py +++ b/spark-testing/spark_utils.py @@ -3,6 +3,7 @@ import logging import os import re +import retrying import urllib import urllib.parse @@ -173,6 +174,17 @@ def check_job_output(task_id, expected_output): raise Exception("{} not found in stdout".format(expected_output)) +@retrying.retry( + wait_fixed=5000, + stop_max_delay=600 * 1000, + retry_on_result=lambda res: not res) +def wait_for_running_job_output(task_id, expected_line): + stdout = sdk_cmd.run_cli("task log --lines=1000 {}".format(task_id)) + result = expected_line in stdout + LOGGER.info('Checking for {} in STDOUT:\n{}\nResult: {}'.format(expected_line, stdout, result)) + return result + + def upload_file(file_path): spark_s3.upload_file(file_path) return spark_s3.http_url(os.path.basename(file_path)) diff --git a/testing/sdk_auth.py b/testing/sdk_auth.py index 8c19f77f..529f4414 100644 --- a/testing/sdk_auth.py +++ b/testing/sdk_auth.py @@ -195,7 +195,7 @@ def _install_marathon_app(app_definition): log.info("Found installed KDC app, reusing it") return _get_kdc_task(self.app_definition["id"]) log.info("Found installed KDC app, destroying it first") - sdk_marathon.destroy(self.app_definition["id"]) + sdk_marathon.destroy_app(self.app_definition["id"]) log.info("Installing KDC Marathon app") _install_marathon_app(self.app_definition) @@ -439,12 +439,14 @@ def cleanup(self): sdk_security.install_enterprise_cli() log.info("Removing the marathon KDC app") - sdk_marathon.destroy_app(self.app_definition["id"]) - - if self._temp_working_dir and isinstance(self._temp_working_dir, tempfile.TemporaryDirectory): - log.info("Deleting temporary working directory") - self._temp_working_dir.cleanup() - - # TODO: separate secrets handling into another module - log.info("Deleting keytab secret") - sdk_security.delete_secret(self.keytab_secret_path) + if sdk_marathon.app_exists(self.app_definition["id"]): + sdk_marathon.destroy_app(self.app_definition["id"]) + if self._temp_working_dir and isinstance(self._temp_working_dir, tempfile.TemporaryDirectory): + log.info("Deleting temporary working directory") + self._temp_working_dir.cleanup() + + # TODO: separate secrets handling into another module + log.info("Deleting keytab secret") + sdk_security.delete_secret(self.keytab_secret_path) + else: + log.info("KDC app doesn't exist, skipping cleanup") diff --git a/testing/sdk_security.py b/testing/sdk_security.py index 8874e084..07d6e6a7 100644 --- a/testing/sdk_security.py +++ b/testing/sdk_security.py @@ -271,7 +271,7 @@ def cleanup_security(service_name: str, log.info("Finished cleaning up strict-mode security") -def security_session(framework_name: str) -> None: +def security_session(framework_name: str, service_account: str="service-acct", secret: str="secret") -> None: """Create a service account and configure permissions for strict-mode tests. This should generally be used as a fixture in a framework's conftest.py: @@ -283,7 +283,9 @@ def configure_security(configure_universe): try: is_strict = sdk_utils.is_strict_mode() if is_strict: - service_account_info = setup_security(framework_name) + service_account_info = setup_security(service_name=framework_name, + service_account=service_account, + service_account_secret=secret) yield finally: if is_strict: diff --git a/tests/conftest.py b/tests/conftest.py index 3fa91059..6a5a33d0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,11 @@ import pytest import sdk_repository +pytest_plugins = [ + "tests.fixtures.hdfs", + "tests.fixtures.kafka", + "tests.fixtures.kdc" +] @pytest.fixture(scope='session') def configure_universe(): diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py new file mode 100644 index 00000000..4c0a8354 --- /dev/null +++ b/tests/fixtures/__init__.py @@ -0,0 +1,7 @@ +import os.path +import sys + +this_file_dir = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'tests'))) +sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'testing'))) +sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'spark-testing'))) diff --git a/tests/fixtures/hdfs.py b/tests/fixtures/hdfs.py new file mode 100644 index 00000000..63b1439e --- /dev/null +++ b/tests/fixtures/hdfs.py @@ -0,0 +1,186 @@ +import base64 +import logging +import json +import os +import pytest +import sdk_auth +import sdk_cmd +import sdk_install +import sdk_marathon +import sdk_security +import sdk_utils +import shakedown +import spark_utils as utils + +from tests import hdfs_auth + +LOGGER = logging.getLogger(__name__) + +DEFAULT_HDFS_TASK_COUNT = 10 +GENERIC_HDFS_USER_PRINCIPAL = "hdfs@{realm}".format(realm=sdk_auth.REALM) +ALICE_USER = "alice" +ALICE_PRINCIPAL = "{user}@{realm}".format(user=ALICE_USER, realm=sdk_auth.REALM) +KEYTAB_SECRET_PATH = os.getenv("KEYTAB_SECRET_PATH", "__dcos_base64___keytab") + +HDFS_PACKAGE_NAME = 'hdfs' +HDFS_SERVICE_NAME = 'hdfs' +HDFS_SERVICE_ACCOUNT = "{}-service-acct".format(HDFS_SERVICE_NAME) +HDFS_SERVICE_ACCOUNT_SECRET = "{}-secret".format(HDFS_SERVICE_NAME) +HISTORY_PACKAGE_NAME = os.getenv("HISTORY_PACKAGE_NAME", "spark-history") +HISTORY_SERVICE_NAME = os.getenv("HISTORY_SERVICE_NAME", "spark-history") + +HDFS_DATA_DIR = '/users/{}'.format(ALICE_USER) +HDFS_HISTORY_DIR = '/history' + +HDFS_KRB5_CONF_ORIG = '''[libdefaults] +default_realm = %(realm)s +dns_lookup_realm = true +dns_lookup_kdc = true +udp_preference_limit = 1 + +[realms] + %(realm)s = { + kdc = kdc.marathon.mesos:2500 + } + +[domain_realm] + .hdfs.dcos = %(realm)s + hdfs.dcos = %(realm)s +''' % {"realm": sdk_auth.REALM} # avoid format() due to problems with "{" in string +HDFS_KRB5_CONF = base64.b64encode(HDFS_KRB5_CONF_ORIG.encode('utf8')).decode('utf8') + +SPARK_SUBMIT_HDFS_KERBEROS_ARGS = ["--kerberos-principal", ALICE_PRINCIPAL, + "--keytab-secret-path", "/{}".format(KEYTAB_SECRET_PATH), + "--conf", "spark.mesos.driverEnv.SPARK_USER={}".format(utils.SPARK_USER)] + +HDFS_CLIENT_ID = "hdfsclient" +SPARK_HISTORY_USER = "nobody" + + +@pytest.fixture(scope='session') +def configure_security_hdfs(): + yield from sdk_security.security_session(framework_name=HDFS_SERVICE_NAME, + service_account=HDFS_SERVICE_ACCOUNT, + secret=HDFS_SERVICE_ACCOUNT_SECRET) + + +@pytest.fixture(scope='session') +def hdfs_with_kerberos(configure_security_hdfs, kerberos_options): + try: + additional_options = { + "service": { + "name": HDFS_SERVICE_NAME, + "security": kerberos_options + }, + "hdfs": { + "security_auth_to_local": hdfs_auth.get_principal_to_user_mapping() + } + } + + if sdk_utils.is_strict_mode(): + additional_options["service"]["service_account"] = HDFS_SERVICE_ACCOUNT + additional_options["service"]["principal"] = HDFS_SERVICE_ACCOUNT + additional_options["service"]["service_account_secret"] = HDFS_SERVICE_ACCOUNT_SECRET + additional_options["service"]["secret_name"] = HDFS_SERVICE_ACCOUNT_SECRET + + sdk_install.uninstall(HDFS_PACKAGE_NAME, HDFS_SERVICE_NAME) + sdk_install.install( + HDFS_PACKAGE_NAME, + HDFS_SERVICE_NAME, + DEFAULT_HDFS_TASK_COUNT, + additional_options=additional_options, + timeout_seconds=30 * 60) + + yield + + finally: + sdk_install.uninstall(HDFS_PACKAGE_NAME, HDFS_SERVICE_NAME) + + +@pytest.fixture(scope='session') +def setup_hdfs_client(hdfs_with_kerberos): + try: + curr_dir = os.path.dirname(os.path.realpath(__file__)) + app_def_path = "{}/../resources/hdfsclient.json".format(curr_dir) + with open(app_def_path) as f: + hdfsclient_app_def = json.load(f) + hdfsclient_app_def["id"] = HDFS_CLIENT_ID + hdfsclient_app_def["secrets"]["hdfs_keytab"]["source"] = KEYTAB_SECRET_PATH + sdk_marathon.install_app(hdfsclient_app_def) + + sdk_auth.kinit(HDFS_CLIENT_ID, keytab="hdfs.keytab", principal=GENERIC_HDFS_USER_PRINCIPAL) + hdfs_cmd("rm -r -skipTrash {}".format(HDFS_DATA_DIR)) + hdfs_cmd("mkdir -p {}".format(HDFS_DATA_DIR)) + hdfs_cmd("chown {}:users {}".format(ALICE_USER, HDFS_DATA_DIR)) + yield + + finally: + sdk_marathon.destroy_app(HDFS_CLIENT_ID) + + +def hdfs_cmd(cmd): + rc, _, _ = sdk_cmd.marathon_task_exec(HDFS_CLIENT_ID, "bin/hdfs dfs -{}".format(cmd)) + if rc != 0: + raise Exception("HDFS command failed with code {}: {}".format(rc, cmd)) + + +@pytest.fixture(scope='session') +def configure_security_spark(): + yield from utils.spark_security_session() + + +@pytest.fixture(scope='session') +def setup_history_server(hdfs_with_kerberos, setup_hdfs_client, configure_universe): + try: + sdk_auth.kinit(HDFS_CLIENT_ID, keytab="hdfs.keytab", principal=GENERIC_HDFS_USER_PRINCIPAL) + hdfs_cmd("rm -r -skipTrash {}".format(HDFS_HISTORY_DIR)) + hdfs_cmd("mkdir {}".format(HDFS_HISTORY_DIR)) + hdfs_cmd("chmod 1777 {}".format(HDFS_HISTORY_DIR)) + + sdk_install.uninstall(HISTORY_PACKAGE_NAME, HISTORY_SERVICE_NAME) + sdk_install.install( + HISTORY_PACKAGE_NAME, + HISTORY_SERVICE_NAME, + 0, + additional_options={ + "service": { + "name": HISTORY_SERVICE_NAME, + "user": SPARK_HISTORY_USER, + "log-dir": "hdfs://hdfs{}".format(HDFS_HISTORY_DIR), + "hdfs-config-url": "http://api.{}.marathon.l4lb.thisdcos.directory/v1/endpoints" + .format(HDFS_SERVICE_NAME) + }, + "security": { + "kerberos": { + "enabled": True, + "krb5conf": HDFS_KRB5_CONF, + "principal": GENERIC_HDFS_USER_PRINCIPAL, + "keytab": KEYTAB_SECRET_PATH + } + } + }, + wait_for_deployment=False, # no deploy plan + insert_strict_options=False) # no standard service_account/etc options + yield + + finally: + sdk_install.uninstall(HISTORY_PACKAGE_NAME, HISTORY_SERVICE_NAME) + + +@pytest.fixture(scope='module') +def kerberized_spark(setup_history_server, hdfs_with_kerberos, kerberos_options, configure_security_spark, configure_universe): + try: + additional_options = { + "hdfs": { + "config-url": "http://api.{}.marathon.l4lb.thisdcos.directory/v1/endpoints".format(HDFS_SERVICE_NAME) + }, + "security": kerberos_options, + "service": { + "spark-history-server-url": shakedown.dcos_url_path("/service/{}".format(HISTORY_SERVICE_NAME)) + } + } + + utils.require_spark(additional_options=additional_options) + yield + finally: + utils.teardown_spark() diff --git a/tests/fixtures/kafka.py b/tests/fixtures/kafka.py new file mode 100644 index 00000000..251f38d3 --- /dev/null +++ b/tests/fixtures/kafka.py @@ -0,0 +1,94 @@ +import base64 +import logging +import os + +import pytest +import sdk_install +import sdk_security +import sdk_utils +import spark_s3 as s3 + +LOGGER = logging.getLogger(__name__) + +PRODUCER_SERVICE_NAME = "Spark->Kafka Producer" + +DEFAULT_KAFKA_TASK_COUNT = 3 +KERBERIZED_KAFKA = True +KAFKA_KRB5_ORIG = b'''[libdefaults] +default_realm = LOCAL + +[realms] + LOCAL = { + kdc = kdc.marathon.autoip.dcos.thisdcos.directory:2500 + } +''' +KAFKA_KRB5 = base64.b64encode(KAFKA_KRB5_ORIG).decode('utf8') +KAFKA_PACKAGE_NAME = os.getenv("KAFKA_PACKAGE_NAME", "kafka") +KAFKA_SERVICE_NAME = os.getenv("KAFKA_SERVICE_NAME", ("secure-kafka" if KERBERIZED_KAFKA else "kafka")) +KAFKA_SERVICE_ACCOUNT = "{}-service-acct".format(KAFKA_SERVICE_NAME) +KAFKA_SERVICE_ACCOUNT_SECRET = "{}-secret".format(KAFKA_SERVICE_NAME) +THIS_DIR = os.path.dirname(os.path.abspath(__file__)) + +KEYTAB_SECRET = "__dcos_base64___keytab" + +def upload_jaas(): + jaas_path = os.path.join(THIS_DIR, "..", "resources", "spark-kafka-client-jaas.conf") + s3.upload_file(jaas_path) + return s3.http_url("spark-kafka-client-jaas.conf") + + +def get_kerberized_kafka_spark_conf(spark_service_name, keytab_secret=KEYTAB_SECRET): + return [ + "--conf", "spark.mesos.driver.secret.names={}".format(keytab_secret), + "--conf", "spark.mesos.driver.secret.filenames=kafka-client.keytab", + "--conf", "spark.mesos.executor.secret.names={}".format(keytab_secret), + "--conf", "spark.mesos.executor.secret.filenames=kafka-client.keytab", + "--conf", "spark.mesos.task.labels=DCOS_SPACE:/{}".format(spark_service_name), + "--conf", "spark.executorEnv.KRB5_CONFIG_BASE64={}".format(KAFKA_KRB5), + "--conf", "spark.mesos.driverEnv.KRB5_CONFIG_BASE64={}".format(KAFKA_KRB5), + "--conf", "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=" + "/mnt/mesos/sandbox/spark-kafka-client-jaas.conf", + "--conf", "spark.executor.extraJavaOptions=" + "-Djava.security.auth.login.config=/mnt/mesos/sandbox/spark-kafka-client-jaas.conf", + ] + + +@pytest.fixture(scope='session') +def configure_security_kafka(): + yield from sdk_security.security_session(framework_name=KAFKA_SERVICE_NAME, + service_account=KAFKA_SERVICE_ACCOUNT, + secret=KAFKA_SERVICE_ACCOUNT_SECRET) + + +@pytest.fixture(scope='session') +def kerberized_kafka(configure_security_kafka, kerberos_options): + try: + additional_options = { + "service": { + "name": KAFKA_SERVICE_NAME, + "security": kerberos_options + }, + "kafka": { + "default_replication_factor": 3, + "num_partitions": 32 + } + } + + if sdk_utils.is_strict_mode(): + additional_options["service"]["service_account"] = KAFKA_SERVICE_ACCOUNT + additional_options["service"]["principal"] = KAFKA_SERVICE_ACCOUNT + additional_options["service"]["service_account_secret"] = KAFKA_SERVICE_ACCOUNT_SECRET + additional_options["service"]["secret_name"] = KAFKA_SERVICE_ACCOUNT_SECRET + + sdk_install.uninstall(KAFKA_PACKAGE_NAME, KAFKA_SERVICE_NAME) + sdk_install.install( + KAFKA_PACKAGE_NAME, + KAFKA_SERVICE_NAME, + DEFAULT_KAFKA_TASK_COUNT, + additional_options=additional_options, + timeout_seconds=30 * 60) + + yield + + finally: + sdk_install.uninstall(KAFKA_PACKAGE_NAME, KAFKA_SERVICE_NAME) diff --git a/tests/fixtures/kdc.py b/tests/fixtures/kdc.py new file mode 100644 index 00000000..4b7d007c --- /dev/null +++ b/tests/fixtures/kdc.py @@ -0,0 +1,85 @@ +import itertools +import pytest +import sdk_auth +import sdk_hosts + +from tests.fixtures.hdfs import HDFS_SERVICE_NAME, GENERIC_HDFS_USER_PRINCIPAL, ALICE_PRINCIPAL +from tests.fixtures.kafka import KAFKA_SERVICE_NAME + + +@pytest.fixture(scope='session') +def kerberos_env(): + try: + kafka_principals = build_kafka_principals() + hdfs_principals = build_hdfs_principals() + + kerberos_env = sdk_auth.KerberosEnvironment(persist=True) + kerberos_env.add_principals(kafka_principals + hdfs_principals) + kerberos_env.finalize() + yield kerberos_env + + finally: + kerberos_env.cleanup() + + +def build_kafka_principals(): + fqdn = "{service_name}.{host_suffix}".format(service_name=KAFKA_SERVICE_NAME, + host_suffix=sdk_hosts.AUTOIP_HOST_SUFFIX) + + brokers = ["kafka-0-broker", "kafka-1-broker", "kafka-2-broker"] + + principals = [] + for b in brokers: + principals.append("kafka/{instance}.{domain}@{realm}".format( + instance=b, + domain=fqdn, + realm=sdk_auth.REALM)) + + principals.append("client@{realm}".format(realm=sdk_auth.REALM)) + return principals + + +def build_hdfs_principals(): + primaries = ["hdfs", "HTTP"] + fqdn = "{service_name}.{host_suffix}".format(service_name=HDFS_SERVICE_NAME, + host_suffix=sdk_hosts.AUTOIP_HOST_SUFFIX) + instances = [ + "name-0-node", + "name-0-zkfc", + "name-1-node", + "name-1-zkfc", + "journal-0-node", + "journal-1-node", + "journal-2-node", + "data-0-node", + "data-1-node", + "data-2-node", + ] + principals = [] + for (instance, primary) in itertools.product(instances, primaries): + principals.append( + "{primary}/{instance}.{fqdn}@{REALM}".format( + primary=primary, + instance=instance, + fqdn=fqdn, + REALM=sdk_auth.REALM + ) + ) + principals.append(GENERIC_HDFS_USER_PRINCIPAL) + principals.append(ALICE_PRINCIPAL) + return principals + + +@pytest.fixture(scope='session') +def kerberos_options(kerberos_env): + return { + "kerberos": { + "enabled": True, + "kdc": { + "hostname": kerberos_env.get_host(), + "port": int(kerberos_env.get_port()) + }, + "keytab_secret": kerberos_env.get_keytab_path(), + "realm": kerberos_env.get_realm() + } + } diff --git a/tests/jobs/scala/build.sbt b/tests/jobs/scala/build.sbt index a3ed1fbc..51f154dd 100644 --- a/tests/jobs/scala/build.sbt +++ b/tests/jobs/scala/build.sbt @@ -1,4 +1,4 @@ -lazy val SparkVersion = sys.env.get("SPARK_VERSION").getOrElse("2.2.0") +lazy val SparkVersion = sys.env.getOrElse("SPARK_VERSION", "2.3.2") lazy val root = (project in file(".")) .settings( @@ -7,9 +7,11 @@ lazy val root = (project in file(".")) scalaVersion := "2.11.8", libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % SparkVersion % "provided", - "org.apache.spark" % "spark-streaming_2.11" % SparkVersion % "provided", + "org.apache.spark" %% "spark-streaming" % SparkVersion % "provided", + "org.apache.spark" %% "spark-sql" % SparkVersion % "provided", "org.apache.spark" %% "spark-mllib" % SparkVersion % "provided", - "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % SparkVersion, + "org.apache.spark" %% "spark-streaming-kafka-0-10" % SparkVersion, + "org.apache.spark" %% "spark-sql-kafka-0-10" % SparkVersion, "org.apache.hadoop" % "hadoop-aws" % "2.6.0", "org.apache.kafka" % "kafka_2.11" % "0.10.0.1", "com.github.scopt" %% "scopt" % "3.7.0", diff --git a/tests/jobs/scala/src/main/scala/KafkaJobs.scala b/tests/jobs/scala/src/main/scala/KafkaJobs.scala index 5c5a8ffa..bac95cd2 100644 --- a/tests/jobs/scala/src/main/scala/KafkaJobs.scala +++ b/tests/jobs/scala/src/main/scala/KafkaJobs.scala @@ -1,11 +1,9 @@ import java.util import scala.collection.mutable -import scala.util.Random - +import scala.util.{Failure, Random, Success, Try} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.StringDeserializer - import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kafka010._ @@ -68,7 +66,12 @@ object KafkaFeeder { p.foreach { r => val d = r.toString() val msg = new ProducerRecord[String, String](topic, null, d) - producer.send(msg) + println(s"sending message: $msg") + + Try(producer.send(msg)) match { + case Success(_) => println("Message sent") + case Failure(ex) => ex.printStackTrace() + } } producer.close() } @@ -167,7 +170,12 @@ object KafkaConsumer { ConsumerStrategies.Subscribe[String, String](Array(topic), props)) val lines = messages.map(_.value) - val words = lines.flatMap(_.split(" ")) + + val words = lines.flatMap(line => { + println(s"Received line: $line") + line.split(" ") + }) + val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.foreachRDD { (rdd: RDD[(String, Long)], time: Time) => val totalCount = WordAccumulator.getInstance(rdd.sparkContext) diff --git a/tests/jobs/scala/src/main/scala/KerberizedKafkaProducer.scala b/tests/jobs/scala/src/main/scala/KerberizedKafkaProducer.scala new file mode 100644 index 00000000..913a7e1f --- /dev/null +++ b/tests/jobs/scala/src/main/scala/KerberizedKafkaProducer.scala @@ -0,0 +1,59 @@ +import java.util.{HashMap => JMap} + +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} + +import scala.util.{Failure, Success, Try} + +/** + * A simple Kafka Producer which should be used in the same way as kafka-console-producer with Kerberized + * Kafka instance. The idea here is that producer runs as a driver without the need to launch executors. + */ +object KerberizedKafkaProducer { + def main(args: Array[String]): Unit = { + if (args.length < 4) { + throw new IllegalArgumentException("USAGE: [message1, message2, ...]") + } + + val Array(service, brokers, topic) = args.take(3) + val messages = args.drop(3) + println(s"Got brokers $brokers, producing ${messages.length} to topic $topic") + try { + withProducer(service, brokers, sendMessages(topic, messages)) + } catch { + case t: Throwable => + t.printStackTrace() + } + + println(s"${messages.length} messages sent to Kafka") + } + + def withProducer(service: String, brokers: String, f: KafkaProducer[Int, String] => _): Unit = { + val producer = getKafkaProducer(service, brokers) + f(producer) + producer.close() + } + + def sendMessages(topic: String, messages: Array[String])(producer: KafkaProducer[Int, String]): Unit = { + messages.foreach { str => + val msg = new ProducerRecord[Int, String](topic, str.hashCode, str) + println(s"sending message: $msg") + + Try(producer.send(msg)) match { + case Success(_) => println("Message sent") + case Failure(ex) => ex.printStackTrace() + } + } + } + + def getKafkaProducer(service: String, brokers: String): KafkaProducer[Int, String] = { + val properties = new JMap[String, Object]() + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer") + properties.put("sasl.kerberos.service.name", service) + properties.put("security.protocol", "SASL_PLAINTEXT") + properties.put("sasl.mechanism", "GSSAPI") + + new KafkaProducer[Int, String](properties) + } +} diff --git a/tests/jobs/scala/src/main/scala/StructuredStreamingWithCheckpointing.scala b/tests/jobs/scala/src/main/scala/StructuredStreamingWithCheckpointing.scala new file mode 100644 index 00000000..ae4a1d8a --- /dev/null +++ b/tests/jobs/scala/src/main/scala/StructuredStreamingWithCheckpointing.scala @@ -0,0 +1,58 @@ +import org.apache.spark.sql.SparkSession + +/** + * Sample Spark Structured Streaming application that consumes data from Kafka (Kerberos supported) + * and outputs count of unique message bodies parsed as Strings to STDOUT + */ +object StructuredStreamingWithCheckpointing { + def main(args: Array[String]): Unit = { + + println("Starting Kafka Structured Streaming Application") + println("Creating Spark Context") + val spark = SparkSession + .builder + .appName("StructuredStreamingWithCheckpointing") + .getOrCreate() + + println("Spark Context Created") + + spark.sparkContext.setLogLevel("DEBUG") + + import spark.implicits._ + + println(s"args.length: ${args.length}, args:") + args.foreach(println) + println() + + var dataStreamReader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", args(0)) + .option("kafka.session.timeout.ms", 10000) + .option("kafka.max.poll.records", 500) + .option("subscribe", args(1)) + .option("startingOffsets", "earliest") + .option("checkpointLocation", args(2)) + + if (args.length == 4 && args(3).nonEmpty){ + println(s"kafka.security.protocol: ${args(3)}") + dataStreamReader = dataStreamReader.option("kafka.security.protocol", args(3)) + } else { + println("kafka.security.protocol is not specified") + } + + val query = dataStreamReader + .load() + .selectExpr("CAST(value AS STRING)") + .as[String] + .groupBy("value") + .count() + .writeStream + .option("checkpointLocation", args(2)) + .outputMode("complete") + .format("console") + .start() + + query.awaitTermination() + } +} diff --git a/tests/test_checkpointing.py b/tests/test_checkpointing.py new file mode 100644 index 00000000..dbafe7c0 --- /dev/null +++ b/tests/test_checkpointing.py @@ -0,0 +1,115 @@ +import logging + +import pytest +import sdk_auth +import sdk_cmd +import sdk_tasks +import shakedown +import spark_utils as utils + +from tests.fixtures.hdfs import SPARK_SUBMIT_HDFS_KERBEROS_ARGS, HDFS_CLIENT_ID, GENERIC_HDFS_USER_PRINCIPAL +from tests.fixtures.hdfs import hdfs_cmd +from tests.fixtures.kafka import KAFKA_SERVICE_NAME, KAFKA_PACKAGE_NAME +from tests.fixtures.kafka import get_kerberized_kafka_spark_conf, upload_jaas + +LOGGER = logging.getLogger(__name__) +HDFS_CHECKPOINT_DIR = "hdfs://hdfs/checkpoints" +HDFS_TMP_DIR = "/tmp" +KAFKA_TEST_TOPIC = "streaming_test" +SPARK_APPLICATION_NAME = "StructuredStreamingWithCheckpointing" +SPARK_SECURITY_PROTOCOL = "SASL_PLAINTEXT" + + +def setup_hdfs_paths(): + sdk_auth.kinit(HDFS_CLIENT_ID, keytab="hdfs.keytab", principal=GENERIC_HDFS_USER_PRINCIPAL) + hdfs_cmd("rm -r -skipTrash {}".format(HDFS_CHECKPOINT_DIR)) + hdfs_cmd("mkdir {}".format(HDFS_CHECKPOINT_DIR)) + hdfs_cmd("chmod 1777 {}".format(HDFS_CHECKPOINT_DIR)) + + hdfs_cmd("rm -r -skipTrash {}".format(HDFS_TMP_DIR)) + hdfs_cmd("mkdir {}".format(HDFS_TMP_DIR)) + hdfs_cmd("chmod 1777 {}".format(HDFS_TMP_DIR)) + + +def feed_sample_data(jar_uri, kafka_brokers, topic, common_args, messages): + producer_args = ["--class", "KerberizedKafkaProducer"] + common_args + producer_id = utils.submit_job(app_url=jar_uri, + app_args="{} {} {} {}".format("kafka", kafka_brokers, topic, ' '.join(messages)), + service_name=utils.SPARK_SERVICE_NAME, + args=producer_args) + + # validating producer output + utils.check_job_output(producer_id, "{} messages sent to Kafka".format(len(messages))) + + +@pytest.mark.skipif(not utils.hdfs_enabled(), reason='HDFS_ENABLED is false') +@pytest.mark.sanity +def test_structured_streaming_recovery(kerberized_spark, kerberized_kafka): + kafka_brokers = ','.join(sdk_cmd.svc_cli(KAFKA_PACKAGE_NAME, KAFKA_SERVICE_NAME, 'endpoints broker', json=True)['dns']) + LOGGER.info("Kafka brokers: {}".format(kafka_brokers)) + + _uri = upload_jaas() + uris = "spark.mesos.uris={}".format(_uri) + + jar_uri = utils.upload_dcos_test_jar() + + kafka_kerberos_args = get_kerberized_kafka_spark_conf(utils.SPARK_SERVICE_NAME) + LOGGER.info("Spark Kerberos configuration for Kafka:\n{}".format('\n'.join(kafka_kerberos_args))) + + common_args = [ + "--conf", "spark.mesos.containerizer=mesos", + "--conf", "spark.scheduler.maxRegisteredResourcesWaitingTime=2400s", + "--conf", "spark.scheduler.minRegisteredResourcesRatio=1.0", + "--conf", uris + ] + kafka_kerberos_args + + # configuring streaming job and HDFS folders + setup_hdfs_paths() + + # running kafka producer + message_set_a = ["abc"] * 100 + feed_sample_data(jar_uri, kafka_brokers, KAFKA_TEST_TOPIC, common_args, message_set_a) + + spark_submit_args = ["--supervise", + "--class", "StructuredStreamingWithCheckpointing", + "--conf", "spark.cores.max=2", + "--conf", "spark.executor.cores=1", + "--conf", "spark.sql.shuffle.partitions=2", + "--conf", "spark.executor.memory=2g"] + common_args + + application_args = "{} {} {} {}".format(kafka_brokers, KAFKA_TEST_TOPIC, HDFS_CHECKPOINT_DIR, SPARK_SECURITY_PROTOCOL) + + driver_task_id = utils.submit_job(app_url=jar_uri, + app_args=application_args, + service_name=utils.SPARK_SERVICE_NAME, + args=(SPARK_SUBMIT_HDFS_KERBEROS_ARGS + spark_submit_args)) + + # Wait until executor is running + LOGGER.info("Starting supervised driver {}".format(driver_task_id)) + sdk_tasks.check_running(SPARK_APPLICATION_NAME, expected_task_count=1, timeout_seconds=600) + + # validating Structured Streaming topic consumption + expected_output_a = "{}| {}".format(message_set_a[0], len(message_set_a)) + LOGGER.info("Validating Structured Streaming topic consumption, waiting for output {}".format(expected_output_a)) + utils.wait_for_running_job_output(driver_task_id, expected_output_a) + + # killing the driver + service_info = shakedown.get_service(SPARK_APPLICATION_NAME).dict() + driver_regex = "spark.mesos.driver.frameworkId={}".format(service_info['id']) + shakedown.kill_process_on_host(hostname=service_info['hostname'], pattern=driver_regex) + + # sending more data to Kafka + message_set_b = ["def"] * 100 + feed_sample_data(jar_uri, kafka_brokers, KAFKA_TEST_TOPIC, common_args + kafka_kerberos_args, message_set_b) + + # checkpointing validation + sdk_tasks.check_running(SPARK_APPLICATION_NAME, expected_task_count=1, timeout_seconds=600) + LOGGER.info("Streaming job has re-started") + + # validating Structured Streaming resumed topic consumption + expected_output_b = "{}| {}".format(message_set_b[0], len(message_set_b)) + LOGGER.info("Validating that consumption resumed from checkpoint, waiting for output '{}' and '{}'" + .format(expected_output_a, expected_output_b)) + + utils.wait_for_running_job_output(driver_task_id, expected_output_a) + utils.wait_for_running_job_output(driver_task_id, expected_output_b) diff --git a/tests/test_hdfs.py b/tests/test_hdfs.py index 51b5dd6d..5145a8be 100644 --- a/tests/test_hdfs.py +++ b/tests/test_hdfs.py @@ -1,238 +1,25 @@ -import base64 -import itertools import json import logging -import os import pytest import retrying - -import shakedown - import sdk_auth import sdk_cmd -import sdk_hosts import sdk_install import sdk_marathon -import sdk_security import sdk_tasks - -from tests import hdfs_auth +import shakedown import spark_utils as utils - log = logging.getLogger(__name__) -DEFAULT_HDFS_TASK_COUNT = 10 -GENERIC_HDFS_USER_PRINCIPAL = "hdfs@{realm}".format(realm=sdk_auth.REALM) -ALICE_USER = "alice" -ALICE_PRINCIPAL = "{user}@{realm}".format(user=ALICE_USER, realm=sdk_auth.REALM) -KEYTAB_SECRET_PATH = os.getenv("KEYTAB_SECRET_PATH", "__dcos_base64___keytab") - -HDFS_PACKAGE_NAME = 'hdfs' -HDFS_SERVICE_NAME = 'hdfs' -HISTORY_PACKAGE_NAME = os.getenv("HISTORY_PACKAGE_NAME", "spark-history") -HISTORY_SERVICE_NAME = os.getenv("HISTORY_SERVICE_NAME", "spark-history") - -HDFS_DATA_DIR = '/users/{}'.format(ALICE_USER) -HDFS_HISTORY_DIR = '/history' - -HDFS_KRB5_CONF_ORIG = '''[libdefaults] -default_realm = %(realm)s -dns_lookup_realm = true -dns_lookup_kdc = true -udp_preference_limit = 1 - -[realms] - %(realm)s = { - kdc = kdc.marathon.mesos:2500 - } - -[domain_realm] - .hdfs.dcos = %(realm)s - hdfs.dcos = %(realm)s -''' % {"realm": sdk_auth.REALM} # avoid format() due to problems with "{" in string -HDFS_KRB5_CONF = base64.b64encode(HDFS_KRB5_CONF_ORIG.encode('utf8')).decode('utf8') - -KERBEROS_ARGS = ["--kerberos-principal", ALICE_PRINCIPAL, - "--keytab-secret-path", "/{}".format(KEYTAB_SECRET_PATH), - "--conf", "spark.mesos.driverEnv.SPARK_USER={}".format(utils.SPARK_USER)] -HDFS_CLIENT_ID = "hdfsclient" - - -@pytest.fixture(scope='module') -def configure_security_hdfs(): - yield from sdk_security.security_session(HDFS_SERVICE_NAME) - - -@pytest.fixture(scope='module') -def hdfs_with_kerberos(configure_security_hdfs): - try: - primaries = ["hdfs", "HTTP"] - fqdn = "{service_name}.{host_suffix}".format( - service_name=HDFS_SERVICE_NAME, host_suffix=sdk_hosts.AUTOIP_HOST_SUFFIX) - instances = [ - "name-0-node", - "name-0-zkfc", - "name-1-node", - "name-1-zkfc", - "journal-0-node", - "journal-1-node", - "journal-2-node", - "data-0-node", - "data-1-node", - "data-2-node", - ] - principals = [] - for (instance, primary) in itertools.product(instances, primaries): - principals.append( - "{primary}/{instance}.{fqdn}@{REALM}".format( - primary=primary, - instance=instance, - fqdn=fqdn, - REALM=sdk_auth.REALM - ) - ) - principals.append(GENERIC_HDFS_USER_PRINCIPAL) - principals.append(ALICE_PRINCIPAL) - - kerberos_env = sdk_auth.KerberosEnvironment() - kerberos_env.add_principals(principals) - kerberos_env.finalize() - service_kerberos_options = { - "service": { - "name": HDFS_SERVICE_NAME, - "security": { - "kerberos": { - "enabled": True, - "kdc": { - "hostname": kerberos_env.get_host(), - "port": int(kerberos_env.get_port()) - }, - "keytab_secret": kerberos_env.get_keytab_path(), - "realm": kerberos_env.get_realm() - } - } - }, - "hdfs": { - "security_auth_to_local": hdfs_auth.get_principal_to_user_mapping() - } - } - - sdk_install.uninstall(HDFS_PACKAGE_NAME, HDFS_SERVICE_NAME) - sdk_install.install( - HDFS_PACKAGE_NAME, - HDFS_SERVICE_NAME, - DEFAULT_HDFS_TASK_COUNT, - additional_options=service_kerberos_options, - timeout_seconds=30*60) - - yield kerberos_env - - finally: - sdk_install.uninstall(HDFS_PACKAGE_NAME, HDFS_SERVICE_NAME) - kerberos_env.cleanup() - - -@pytest.fixture(scope='module') -def configure_security_spark(): - yield from utils.spark_security_session() - - -@pytest.fixture(scope='module') -def setup_hdfs_client(hdfs_with_kerberos): - try: - curr_dir = os.path.dirname(os.path.realpath(__file__)) - app_def_path = "{}/resources/hdfsclient.json".format(curr_dir) - with open(app_def_path) as f: - hdfsclient_app_def = json.load(f) - hdfsclient_app_def["id"] = HDFS_CLIENT_ID - hdfsclient_app_def["secrets"]["hdfs_keytab"]["source"] = KEYTAB_SECRET_PATH - sdk_marathon.install_app(hdfsclient_app_def) - - sdk_auth.kinit(HDFS_CLIENT_ID, keytab="hdfs.keytab", principal=GENERIC_HDFS_USER_PRINCIPAL) - hdfs_cmd("rm -r -skipTrash {}".format(HDFS_DATA_DIR)) - hdfs_cmd("mkdir -p {}".format(HDFS_DATA_DIR)) - hdfs_cmd("chown {}:users {}".format(ALICE_USER, HDFS_DATA_DIR)) - yield - - finally: - sdk_marathon.destroy_app(HDFS_CLIENT_ID) - - -def hdfs_cmd(cmd): - rc, _, _ = sdk_cmd.marathon_task_exec(HDFS_CLIENT_ID, "bin/hdfs dfs -{}".format(cmd)) - if rc != 0: - raise Exception("HDFS command failed with code {}: {}".format(rc, cmd)) - - -@pytest.fixture(scope='module') -def setup_history_server(hdfs_with_kerberos, setup_hdfs_client, configure_universe): - try: - sdk_auth.kinit(HDFS_CLIENT_ID, keytab="hdfs.keytab", principal=GENERIC_HDFS_USER_PRINCIPAL) - hdfs_cmd("rm -r -skipTrash {}".format(HDFS_HISTORY_DIR)) - hdfs_cmd("mkdir {}".format(HDFS_HISTORY_DIR)) - hdfs_cmd("chmod 1777 {}".format(HDFS_HISTORY_DIR)) - - sdk_install.uninstall(HISTORY_PACKAGE_NAME, HISTORY_SERVICE_NAME) - sdk_install.install( - HISTORY_PACKAGE_NAME, - HISTORY_SERVICE_NAME, - 0, - additional_options={ - "service": { - "name": HISTORY_SERVICE_NAME, - "user": utils.SPARK_HISTORY_USER, - "log-dir": "hdfs://hdfs{}".format(HDFS_HISTORY_DIR), - "hdfs-config-url": "http://api.{}.marathon.l4lb.thisdcos.directory/v1/endpoints" - .format(HDFS_SERVICE_NAME) - }, - "security": { - "kerberos": { - "enabled": True, - "krb5conf": HDFS_KRB5_CONF, - "principal": GENERIC_HDFS_USER_PRINCIPAL, - "keytab": KEYTAB_SECRET_PATH - } - } - }, - wait_for_deployment=False, # no deploy plan - insert_strict_options=False) # no standard service_account/etc options - yield - - finally: - sdk_install.uninstall(HISTORY_PACKAGE_NAME, HISTORY_SERVICE_NAME) - - -@pytest.fixture(scope='module', autouse=True) -def setup_spark(hdfs_with_kerberos, setup_history_server, configure_security_spark, configure_universe): - try: - additional_options = { - "hdfs": { - "config-url": "http://api.{}.marathon.l4lb.thisdcos.directory/v1/endpoints".format(HDFS_SERVICE_NAME) - }, - "security": { - "kerberos": { - "enabled": True, - "realm": sdk_auth.REALM, - "kdc": { - "hostname": hdfs_with_kerberos.get_host(), - "port": int(hdfs_with_kerberos.get_port()) - } - } - }, - "service": { - "spark-history-server-url": shakedown.dcos_url_path("/service/{}".format(HISTORY_SERVICE_NAME)) - } - } - utils.require_spark(additional_options=additional_options) - yield - finally: - utils.teardown_spark() +from tests.fixtures.hdfs import HDFS_SERVICE_NAME, HDFS_DATA_DIR, HDFS_HISTORY_DIR +from tests.fixtures.hdfs import HISTORY_PACKAGE_NAME +from tests.fixtures.hdfs import SPARK_SUBMIT_HDFS_KERBEROS_ARGS, KEYTAB_SECRET_PATH, GENERIC_HDFS_USER_PRINCIPAL def _run_terasort_job(terasort_class, app_args, expected_output): jar_url = 'https://downloads.mesosphere.io/spark/examples/spark-terasort-1.1-jar-with-dependencies_2.11.jar' - submit_args = ["--class", terasort_class] + KERBEROS_ARGS + submit_args = ["--class", terasort_class] + SPARK_SUBMIT_HDFS_KERBEROS_ARGS utils.run_tests(app_url=jar_url, app_args=" ".join(app_args), expected_output=expected_output, @@ -241,7 +28,7 @@ def _run_terasort_job(terasort_class, app_args, expected_output): @pytest.mark.skipif(not utils.hdfs_enabled(), reason='HDFS_ENABLED is false') @pytest.mark.sanity -def test_terasort_suite(): +def test_terasort_suite(kerberized_spark, hdfs_with_kerberos): data_dir = "hdfs://{}".format(HDFS_DATA_DIR) terasort_in = "{}/{}".format(data_dir, "terasort_in") terasort_out = "{}/{}".format(data_dir, "terasort_out") @@ -267,7 +54,7 @@ def test_supervise(): @retrying.retry( wait_fixed=1000, - stop_max_delay=600*1000, + stop_max_delay=600 * 1000, retry_on_result=lambda res: not res) def wait_job_present(present): svc = shakedown.get_service(job_service_name) @@ -285,7 +72,7 @@ def wait_job_present(present): driver_id = utils.submit_job(app_url=utils.SPARK_EXAMPLES, app_args="10.0.0.1 9090 {dir}/netcheck {dir}/outfile".format(dir=data_dir), service_name=utils.SPARK_SERVICE_NAME, - args=(KERBEROS_ARGS + job_args)) + args=(SPARK_SUBMIT_HDFS_KERBEROS_ARGS + job_args)) log.info("Started supervised driver {}".format(driver_id)) wait_job_present(True) log.info("Job has registered") @@ -324,7 +111,7 @@ def wait_job_present(present): @pytest.mark.skipif(not utils.hdfs_enabled(), reason='HDFS_ENABLED is false') @pytest.mark.sanity -def test_history(): +def test_history(kerberized_spark, hdfs_with_kerberos, setup_history_server): job_args = ["--class", "org.apache.spark.examples.SparkPi", "--conf", "spark.eventLog.enabled=true", "--conf", "spark.eventLog.dir=hdfs://hdfs{}".format(HDFS_HISTORY_DIR)] @@ -332,12 +119,12 @@ def test_history(): app_args="100", expected_output="Pi is roughly 3", service_name="spark", - args=(job_args + KERBEROS_ARGS)) + args=(job_args + SPARK_SUBMIT_HDFS_KERBEROS_ARGS)) @pytest.mark.skipif(not utils.hdfs_enabled(), reason='HDFS_ENABLED is false') @pytest.mark.sanity -def test_history_kdc_config(hdfs_with_kerberos): +def test_history_kdc_config(hdfs_with_kerberos, kerberos_env): history_service_with_kdc_config = "spark-history-with-kdc-config" try: # This deployment will fail if kerberos is not configured properly. @@ -358,8 +145,8 @@ def test_history_kdc_config(hdfs_with_kerberos): "kerberos": { "enabled": True, "kdc": { - "hostname": hdfs_with_kerberos.get_host(), - "port": int(hdfs_with_kerberos.get_port()) + "hostname": kerberos_env.get_host(), + "port": int(kerberos_env.get_port()) }, "realm": sdk_auth.REALM, "principal": GENERIC_HDFS_USER_PRINCIPAL, @@ -367,8 +154,8 @@ def test_history_kdc_config(hdfs_with_kerberos): } } }, - wait_for_deployment=False, # no deploy plan - insert_strict_options=False) # no standard service_account/etc options + wait_for_deployment=False, # no deploy plan + insert_strict_options=False) # no standard service_account/etc options finally: - sdk_marathon.destroy_app(history_service_with_kdc_config) + sdk_marathon.destroy_app(history_service_with_kdc_config) \ No newline at end of file diff --git a/tests/test_kafka.py b/tests/test_kafka.py index 8223aea3..bfecd070 100644 --- a/tests/test_kafka.py +++ b/tests/test_kafka.py @@ -1,104 +1,23 @@ -import base64 import logging import os import pytest - -import spark_s3 as s3 -import spark_utils as utils - -import sdk_auth import sdk_cmd -import sdk_hosts -import sdk_install import sdk_tasks -import sdk_security +import spark_utils as utils +from tests.fixtures.kafka import KERBERIZED_KAFKA, KAFKA_PACKAGE_NAME, KAFKA_SERVICE_NAME, KEYTAB_SECRET +from tests.fixtures.kafka import get_kerberized_kafka_spark_conf, upload_jaas LOGGER = logging.getLogger(__name__) THIS_DIR = os.path.dirname(os.path.abspath(__file__)) -PRODUCER_SERVICE_NAME = "Spark->Kafka Producer" - -DEFAULT_KAFKA_TASK_COUNT=3 -KERBERIZED_KAFKA = True -KAFKA_KRB5_ORIG = b'''[libdefaults] -default_realm = LOCAL - -[realms] - LOCAL = { - kdc = kdc.marathon.autoip.dcos.thisdcos.directory:2500 - } -''' -KAFKA_KRB5 = base64.b64encode(KAFKA_KRB5_ORIG).decode('utf8') - -KAFKA_PACKAGE_NAME = os.getenv("KAFKA_PACKAGE_NAME", "kafka") -KAFKA_SERVICE_NAME = os.getenv("KAFKA_SERVICE_NAME", ("secure-kafka" if KERBERIZED_KAFKA else "kafka")) - - -@pytest.fixture(scope='module') -def configure_security_kafka(): - yield from sdk_security.security_session(KAFKA_SERVICE_NAME) - - -@pytest.fixture(scope='module') -def kerberized_kafka(configure_security_kafka): - try: - fqdn = "{service_name}.{host_suffix}".format(service_name=KAFKA_SERVICE_NAME, - host_suffix=sdk_hosts.AUTOIP_HOST_SUFFIX) - - brokers = ["kafka-0-broker", "kafka-1-broker", "kafka-2-broker"] - - principals = [] - for b in brokers: - principals.append("kafka/{instance}.{domain}@{realm}".format( - instance=b, - domain=fqdn, - realm=sdk_auth.REALM)) - - principals.append("client@{realm}".format(realm=sdk_auth.REALM)) - - kerberos_env = sdk_auth.KerberosEnvironment() - kerberos_env.add_principals(principals) - kerberos_env.finalize() - - service_kerberos_options = { - "service": { - "name": KAFKA_SERVICE_NAME, - "security": { - "kerberos": { - "enabled": True, - "kdc": { - "hostname": kerberos_env.get_host(), - "port": int(kerberos_env.get_port()) - }, - "keytab_secret": kerberos_env.get_keytab_path(), - "realm": kerberos_env.get_realm() - } - } - } - } - - sdk_install.uninstall(KAFKA_PACKAGE_NAME, KAFKA_SERVICE_NAME) - sdk_install.install( - KAFKA_PACKAGE_NAME, - KAFKA_SERVICE_NAME, - DEFAULT_KAFKA_TASK_COUNT, - additional_options=service_kerberos_options, - timeout_seconds=30 * 60) - - yield kerberos_env - - finally: - sdk_install.uninstall(KAFKA_PACKAGE_NAME, KAFKA_SERVICE_NAME) - kerberos_env.cleanup() - -@pytest.fixture(scope='module') +@pytest.fixture(scope='session') def configure_security_spark(): yield from utils.spark_security_session() -@pytest.fixture(scope='module', autouse=True) +@pytest.fixture(scope='module') def setup_spark(kerberized_kafka, configure_security_spark, configure_universe): try: utils.upload_dcos_test_jar() @@ -111,13 +30,13 @@ def setup_spark(kerberized_kafka, configure_security_spark, configure_universe): @pytest.mark.sanity @pytest.mark.smoke @pytest.mark.skipif(not utils.kafka_enabled(), reason='KAFKA_ENABLED is false') -def test_spark_and_kafka(): +def test_spark_and_kafka(setup_spark): kerberos_flag = "true" if KERBERIZED_KAFKA else "false" # flag for using kerberized kafka given to app stop_count = "48" # some reasonable number test_pipeline( kerberos_flag=kerberos_flag, jar_uri=utils.dcos_test_jar_url(), - keytab_secret="__dcos_base64___keytab", + keytab_secret=KEYTAB_SECRET, stop_count=stop_count, spark_service_name=utils.SPARK_SERVICE_NAME) @@ -136,9 +55,7 @@ def test_pipeline(kerberos_flag, stop_count, jar_uri, keytab_secret, spark_servi uris = "spark.mesos.uris={}".format(big_file_url) if kerberized and jaas_uri is None: - jaas_path = os.path.join(THIS_DIR, "resources", "spark-kafka-client-jaas.conf") - s3.upload_file(jaas_path) - _uri = s3.http_url("spark-kafka-client-jaas.conf") + _uri = upload_jaas() uris += ",{}".format(_uri) else: uris += ",{}".format(jaas_uri) @@ -150,21 +67,9 @@ def test_pipeline(kerberos_flag, stop_count, jar_uri, keytab_secret, spark_servi "--conf", uris ] - kerberos_args = [ - "--conf", "spark.mesos.driver.secret.names={}".format(keytab_secret), - "--conf", "spark.mesos.driver.secret.filenames=kafka-client.keytab", - "--conf", "spark.mesos.executor.secret.names={}".format(keytab_secret), - "--conf", "spark.mesos.executor.secret.filenames=kafka-client.keytab", - "--conf", "spark.mesos.task.labels=DCOS_SPACE:/{}".format(spark_service_name), - "--conf", "spark.executorEnv.KRB5_CONFIG_BASE64={}".format(KAFKA_KRB5), - "--conf", "spark.mesos.driverEnv.KRB5_CONFIG_BASE64={}".format(KAFKA_KRB5), - "--conf", "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=" - "/mnt/mesos/sandbox/spark-kafka-client-jaas.conf", - "--conf", "spark.executor.extraJavaOptions=" - "-Djava.security.auth.login.config=/mnt/mesos/sandbox/spark-kafka-client-jaas.conf", - ] + kerberos_args = get_kerberized_kafka_spark_conf(spark_service_name) - producer_config = ["--conf", "spark.cores.max=2", "--conf", "spark.executor.cores=2", + producer_config = ["--conf", "spark.cores.max=2", "--conf", "spark.executor.cores=1", "--class", "KafkaFeeder"] + common_args if kerberized: @@ -177,7 +82,8 @@ def test_pipeline(kerberos_flag, stop_count, jar_uri, keytab_secret, spark_servi sdk_tasks.check_running(KAFKA_SERVICE_NAME, 1, timeout_seconds=600) - consumer_config = ["--conf", "spark.cores.max=4", "--class", "KafkaConsumer"] + common_args + consumer_config = ["--conf", "spark.cores.max=2", "--conf", "spark.executor.cores=1", + "--class", "KafkaConsumer"] + common_args if kerberized: consumer_config += kerberos_args diff --git a/tests/test_spark.py b/tests/test_spark.py index 729cd529..5a845ec4 100644 --- a/tests/test_spark.py +++ b/tests/test_spark.py @@ -32,7 +32,7 @@ CNI_TEST_NUM_EXECUTORS = 1 -@pytest.fixture(scope='module') +@pytest.fixture(scope='session') def configure_security(): yield from utils.spark_security_session() @@ -411,8 +411,9 @@ def verify_ip_is_reachable(ip): @pytest.mark.sanity def test_task_stdout(): + service_name = utils.FOLDERED_SPARK_SERVICE_NAME + try: - service_name = utils.FOLDERED_SPARK_SERVICE_NAME task_id = service_name.lstrip("/").replace("/", "_") utils.require_spark(service_name=service_name)