Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ cluster_info.json
dcos-launch-*
env/
venv/

.DS_Store
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ stub-universe-url: docker-dist
fi

# Special directive to assist in speeding up CI builds
# Generates a master checksum against the source files in tests/jobs/scala/src/*
# Generates a master checksum against the source files in tests/jobs/scala
# This is later compared to avoid building identical jars from previous CI runs
test-jar-checksum:
find tests/jobs/scala/src/* -type f -exec md5sum '{}' + | sort > checksums
find tests/jobs/scala -type f -exec md5sum '{}' + | sort > checksums
md5sum checksums | cut -d ' ' -f1 > test-jar-checksum

MD5SUM ?= $(shell cat test-jar-checksum)
Expand Down
10 changes: 5 additions & 5 deletions manifest.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
{
"spark_version": "2.2.1",
"spark_version": "2.3.2",
"default_spark_dist": {
"hadoop_version": "2.7",
"uri": "https://downloads.mesosphere.com/spark/assets/spark-2.2.1-4-SNAPSHOT-bin-2.7.tgz"
"uri": "https://downloads.mesosphere.com/spark/assets/spark-2.3.2-SNAPSHOT-2-bin-2.7.7.tgz"
},
"spark_dist": [
{
"hadoop_version": "2.7",
"uri": "https://downloads.mesosphere.com/spark/assets/spark-2.2.1-4-SNAPSHOT-bin-2.7.tgz"
"uri": "https://downloads.mesosphere.com/spark/assets/spark-2.3.2-SNAPSHOT-2-bin-2.7.7.tgz"
},
{
"hadoop_version": "2.6",
"uri": "https://downloads.mesosphere.com/spark/assets/spark-2.2.1-4-SNAPSHOT-bin-2.6.tgz"
"uri": "https://downloads.mesosphere.com/spark/assets/spark-2.3.2-SNAPSHOT-2-bin-2.6.5.tgz"
}
]
}
}
12 changes: 12 additions & 0 deletions spark-testing/spark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import re
import retrying
import urllib
import urllib.parse

Expand Down Expand Up @@ -173,6 +174,17 @@ def check_job_output(task_id, expected_output):
raise Exception("{} not found in stdout".format(expected_output))


@retrying.retry(
wait_fixed=5000,
stop_max_delay=600 * 1000,
retry_on_result=lambda res: not res)
def wait_for_running_job_output(task_id, expected_line):
stdout = sdk_cmd.run_cli("task log --lines=1000 {}".format(task_id))
result = expected_line in stdout
LOGGER.info('Checking for {} in STDOUT:\n{}\nResult: {}'.format(expected_line, stdout, result))
return result


def upload_file(file_path):
spark_s3.upload_file(file_path)
return spark_s3.http_url(os.path.basename(file_path))
Expand Down
22 changes: 12 additions & 10 deletions testing/sdk_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def _install_marathon_app(app_definition):
log.info("Found installed KDC app, reusing it")
return _get_kdc_task(self.app_definition["id"])
log.info("Found installed KDC app, destroying it first")
sdk_marathon.destroy(self.app_definition["id"])
sdk_marathon.destroy_app(self.app_definition["id"])

log.info("Installing KDC Marathon app")
_install_marathon_app(self.app_definition)
Expand Down Expand Up @@ -439,12 +439,14 @@ def cleanup(self):
sdk_security.install_enterprise_cli()

log.info("Removing the marathon KDC app")
sdk_marathon.destroy_app(self.app_definition["id"])

if self._temp_working_dir and isinstance(self._temp_working_dir, tempfile.TemporaryDirectory):
log.info("Deleting temporary working directory")
self._temp_working_dir.cleanup()

# TODO: separate secrets handling into another module
log.info("Deleting keytab secret")
sdk_security.delete_secret(self.keytab_secret_path)
if sdk_marathon.app_exists(self.app_definition["id"]):
sdk_marathon.destroy_app(self.app_definition["id"])
if self._temp_working_dir and isinstance(self._temp_working_dir, tempfile.TemporaryDirectory):
log.info("Deleting temporary working directory")
self._temp_working_dir.cleanup()

# TODO: separate secrets handling into another module
log.info("Deleting keytab secret")
sdk_security.delete_secret(self.keytab_secret_path)
else:
log.info("KDC app doesn't exist, skipping cleanup")
6 changes: 4 additions & 2 deletions testing/sdk_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def cleanup_security(service_name: str,
log.info("Finished cleaning up strict-mode security")


def security_session(framework_name: str) -> None:
def security_session(framework_name: str, service_account: str="service-acct", secret: str="secret") -> None:
"""Create a service account and configure permissions for strict-mode tests.

This should generally be used as a fixture in a framework's conftest.py:
Expand All @@ -283,7 +283,9 @@ def configure_security(configure_universe):
try:
is_strict = sdk_utils.is_strict_mode()
if is_strict:
service_account_info = setup_security(framework_name)
service_account_info = setup_security(service_name=framework_name,
service_account=service_account,
service_account_secret=secret)
yield
finally:
if is_strict:
Expand Down
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import pytest
import sdk_repository

pytest_plugins = [
"tests.fixtures.hdfs",
"tests.fixtures.kafka",
"tests.fixtures.kdc"
]

@pytest.fixture(scope='session')
def configure_universe():
Expand Down
7 changes: 7 additions & 0 deletions tests/fixtures/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import os.path
import sys

this_file_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'tests')))
sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'testing')))
sys.path.append(os.path.normpath(os.path.join(this_file_dir, '..', '..', 'spark-testing')))
186 changes: 186 additions & 0 deletions tests/fixtures/hdfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import base64
import logging
import json
import os
import pytest
import sdk_auth
import sdk_cmd
import sdk_install
import sdk_marathon
import sdk_security
import sdk_utils
import shakedown
import spark_utils as utils

from tests import hdfs_auth

LOGGER = logging.getLogger(__name__)

DEFAULT_HDFS_TASK_COUNT = 10
GENERIC_HDFS_USER_PRINCIPAL = "hdfs@{realm}".format(realm=sdk_auth.REALM)
ALICE_USER = "alice"
ALICE_PRINCIPAL = "{user}@{realm}".format(user=ALICE_USER, realm=sdk_auth.REALM)
KEYTAB_SECRET_PATH = os.getenv("KEYTAB_SECRET_PATH", "__dcos_base64___keytab")

HDFS_PACKAGE_NAME = 'hdfs'
HDFS_SERVICE_NAME = 'hdfs'
HDFS_SERVICE_ACCOUNT = "{}-service-acct".format(HDFS_SERVICE_NAME)
HDFS_SERVICE_ACCOUNT_SECRET = "{}-secret".format(HDFS_SERVICE_NAME)
HISTORY_PACKAGE_NAME = os.getenv("HISTORY_PACKAGE_NAME", "spark-history")
HISTORY_SERVICE_NAME = os.getenv("HISTORY_SERVICE_NAME", "spark-history")

HDFS_DATA_DIR = '/users/{}'.format(ALICE_USER)
HDFS_HISTORY_DIR = '/history'

HDFS_KRB5_CONF_ORIG = '''[libdefaults]
default_realm = %(realm)s
dns_lookup_realm = true
dns_lookup_kdc = true
udp_preference_limit = 1

[realms]
%(realm)s = {
kdc = kdc.marathon.mesos:2500
}

[domain_realm]
.hdfs.dcos = %(realm)s
hdfs.dcos = %(realm)s
''' % {"realm": sdk_auth.REALM} # avoid format() due to problems with "{" in string
HDFS_KRB5_CONF = base64.b64encode(HDFS_KRB5_CONF_ORIG.encode('utf8')).decode('utf8')

SPARK_SUBMIT_HDFS_KERBEROS_ARGS = ["--kerberos-principal", ALICE_PRINCIPAL,
"--keytab-secret-path", "/{}".format(KEYTAB_SECRET_PATH),
"--conf", "spark.mesos.driverEnv.SPARK_USER={}".format(utils.SPARK_USER)]

HDFS_CLIENT_ID = "hdfsclient"
SPARK_HISTORY_USER = "nobody"


@pytest.fixture(scope='session')
def configure_security_hdfs():
yield from sdk_security.security_session(framework_name=HDFS_SERVICE_NAME,
service_account=HDFS_SERVICE_ACCOUNT,
secret=HDFS_SERVICE_ACCOUNT_SECRET)


@pytest.fixture(scope='session')
def hdfs_with_kerberos(configure_security_hdfs, kerberos_options):
try:
additional_options = {
"service": {
"name": HDFS_SERVICE_NAME,
"security": kerberos_options
},
"hdfs": {
"security_auth_to_local": hdfs_auth.get_principal_to_user_mapping()
}
}

if sdk_utils.is_strict_mode():
additional_options["service"]["service_account"] = HDFS_SERVICE_ACCOUNT
additional_options["service"]["principal"] = HDFS_SERVICE_ACCOUNT
additional_options["service"]["service_account_secret"] = HDFS_SERVICE_ACCOUNT_SECRET
additional_options["service"]["secret_name"] = HDFS_SERVICE_ACCOUNT_SECRET

sdk_install.uninstall(HDFS_PACKAGE_NAME, HDFS_SERVICE_NAME)
sdk_install.install(
HDFS_PACKAGE_NAME,
HDFS_SERVICE_NAME,
DEFAULT_HDFS_TASK_COUNT,
additional_options=additional_options,
timeout_seconds=30 * 60)

yield

finally:
sdk_install.uninstall(HDFS_PACKAGE_NAME, HDFS_SERVICE_NAME)


@pytest.fixture(scope='session')
def setup_hdfs_client(hdfs_with_kerberos):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is strange that this depends on hdfs_with_kerberos without accessing it. Should the client depend on the service in some way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main intent of this dependency is to introduce the ordering in which hdfsclient never created before hdfs service. What would be a better way of implementing this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realise that it's about ordering, so that's fine, but the dependence is not really explicit here. It takes knowledge of the client itself to know that it requires an HDFS service with a name hdfs.

What I have used in the past is something like: https://github.com/mesosphere/dcos-commons/blob/d83139ac0fb9c53719a9e5637a5704ae9dfe23d5/frameworks/hdfs/tests/test_kerberos_auth.py#L101

where the hdfs_with_kerberos is a dict which contains the required configuration and this si explicitly passed to the client. It's not a requirement for this PR though.

try:
curr_dir = os.path.dirname(os.path.realpath(__file__))
app_def_path = "{}/../resources/hdfsclient.json".format(curr_dir)
with open(app_def_path) as f:
hdfsclient_app_def = json.load(f)
hdfsclient_app_def["id"] = HDFS_CLIENT_ID
hdfsclient_app_def["secrets"]["hdfs_keytab"]["source"] = KEYTAB_SECRET_PATH
sdk_marathon.install_app(hdfsclient_app_def)

sdk_auth.kinit(HDFS_CLIENT_ID, keytab="hdfs.keytab", principal=GENERIC_HDFS_USER_PRINCIPAL)
hdfs_cmd("rm -r -skipTrash {}".format(HDFS_DATA_DIR))
hdfs_cmd("mkdir -p {}".format(HDFS_DATA_DIR))
hdfs_cmd("chown {}:users {}".format(ALICE_USER, HDFS_DATA_DIR))
yield

finally:
sdk_marathon.destroy_app(HDFS_CLIENT_ID)


def hdfs_cmd(cmd):
rc, _, _ = sdk_cmd.marathon_task_exec(HDFS_CLIENT_ID, "bin/hdfs dfs -{}".format(cmd))
if rc != 0:
raise Exception("HDFS command failed with code {}: {}".format(rc, cmd))


@pytest.fixture(scope='session')
def configure_security_spark():
yield from utils.spark_security_session()


@pytest.fixture(scope='session')
def setup_history_server(hdfs_with_kerberos, setup_hdfs_client, configure_universe):
try:
sdk_auth.kinit(HDFS_CLIENT_ID, keytab="hdfs.keytab", principal=GENERIC_HDFS_USER_PRINCIPAL)
hdfs_cmd("rm -r -skipTrash {}".format(HDFS_HISTORY_DIR))
hdfs_cmd("mkdir {}".format(HDFS_HISTORY_DIR))
hdfs_cmd("chmod 1777 {}".format(HDFS_HISTORY_DIR))

sdk_install.uninstall(HISTORY_PACKAGE_NAME, HISTORY_SERVICE_NAME)
sdk_install.install(
HISTORY_PACKAGE_NAME,
HISTORY_SERVICE_NAME,
0,
additional_options={
"service": {
"name": HISTORY_SERVICE_NAME,
"user": SPARK_HISTORY_USER,
"log-dir": "hdfs://hdfs{}".format(HDFS_HISTORY_DIR),
"hdfs-config-url": "http://api.{}.marathon.l4lb.thisdcos.directory/v1/endpoints"
.format(HDFS_SERVICE_NAME)
},
"security": {
"kerberos": {
"enabled": True,
"krb5conf": HDFS_KRB5_CONF,
"principal": GENERIC_HDFS_USER_PRINCIPAL,
"keytab": KEYTAB_SECRET_PATH
}
}
},
wait_for_deployment=False, # no deploy plan
insert_strict_options=False) # no standard service_account/etc options
yield

finally:
sdk_install.uninstall(HISTORY_PACKAGE_NAME, HISTORY_SERVICE_NAME)


@pytest.fixture(scope='module')
def kerberized_spark(setup_history_server, hdfs_with_kerberos, kerberos_options, configure_security_spark, configure_universe):
try:
additional_options = {
"hdfs": {
"config-url": "http://api.{}.marathon.l4lb.thisdcos.directory/v1/endpoints".format(HDFS_SERVICE_NAME)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I'm not a big fan of these global variables, and would rather access the hdfs_with_kerberos to obtain these values. Not going to block on this though because I understand that that would be a larger refactoring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree but I'd prefer to not mix necessary refactorings needed for making the test work with general code refactoring even if it is needed. Integration tests codebase definitely needs a few rounds of refactoring so let's address them in a separate PRs.

},
"security": kerberos_options,
"service": {
"spark-history-server-url": shakedown.dcos_url_path("/service/{}".format(HISTORY_SERVICE_NAME))
}
}

utils.require_spark(additional_options=additional_options)
yield
finally:
utils.teardown_spark()
Loading