Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 122 additions & 64 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import platform
import sys
import uuid
import multiprocessing as mp

from statistics import mean
from subprocess import run, Popen, PIPE, STDOUT
Expand Down Expand Up @@ -441,10 +442,11 @@ def podman_create(tags):

# Give failures a few tries as this load test is not always performed
# within production quality environments.
failures = 0
failure_count = 0
success_count = 0
max_failures = 3

while failures < max_failures:
while failure_count < max_failures:

# Call Podman to push the Dockerfile
cmd = [
Expand All @@ -466,13 +468,14 @@ def podman_create(tags):
try:
assert p.returncode == 0
success = True
success_count = success_count + 1
except Exception:
success = False
failures = failures + 1
failure_count = failure_count + 1
logger.info("Failed to push tag: %s" % tag)
logger.info("STDOUT: %s" % output)
logger.info("STDERR: %s" % errors)
logger.info("Retrying. %s/%s failures." % (failures, max_failures))
logger.info("Retrying. %s/%s failures." % (failure_count, max_failures))

# Statistics / Data
elapsed_time = end_time - start_time
Expand All @@ -481,7 +484,8 @@ def podman_create(tags):
'elapsed_time': elapsed_time.total_seconds(),
'start_time': start_time,
'end_time': end_time,
'failures': failures,
'failure_count': failure_count,
'success_count': success_count,
'successful': success,
}
results.append(data)
Expand All @@ -496,7 +500,7 @@ def podman_create(tags):
# Write data to Elasticsearch
logger.info("Writing 'registry push' results to Elasticsearch")
es = Elasticsearch([ES_HOST], port=ES_PORT)
index = 'quay-registry-push'
index = 'quay-push-results'
docs = []
for result in results:

Expand Down Expand Up @@ -569,10 +573,11 @@ def podman_pull(tags):
'--storage-driver', 'overlay',
]

failures = 0
failure_count = 0
success_count = 0
max_failures = 3

while failures < max_failures:
while failure_count < max_failures:

# Time the Push
start_time = datetime.datetime.utcnow()
Expand All @@ -584,13 +589,14 @@ def podman_pull(tags):
try:
assert p.returncode == 0
success = True
success_count = success_count + 1
except Exception:
success = False
failures = failures + 1
failure_count = failure_count + 1
logger.info("Failed to pull tag: %s" % tag)
logger.info("STDOUT: %s" % output)
logger.info("STDERR: %s" % errors)
logger.info("Retrying. %s/%s failures." % (failures, max_failures))
logger.info("Retrying. %s/%s failures." % (failure_count, max_failures))

# Statistics / Data
elapsed_time = end_time - start_time
Expand All @@ -599,7 +605,8 @@ def podman_pull(tags):
'elapsed_time': elapsed_time.total_seconds(),
'start_time': start_time,
'end_time': end_time,
'failures': failures,
'success_count': success_count,
'failure_count': failure_count,
'successful': success,
}
results.append(data)
Expand All @@ -613,9 +620,9 @@ def podman_pull(tags):
podman_clear_cache()

# Write data to Elasticsearch
logger.info("Writing 'registry push' results to Elasticsearch")
logger.info("Writing 'registry pull' results to Elasticsearch")
es = Elasticsearch([ES_HOST], port=ES_PORT)
index = 'quay-registry-pull'
index = 'quay-pull-results'
docs = []
for result in results:

Expand Down Expand Up @@ -664,7 +671,7 @@ def test_pull(num_tags):

tags = []
for n in range(num_tags):
tag = redis_client.lpop('tags_to_pull')
tag = redis_client.lpop('tags_to_pull'+"-".join(username.split("_")))
if tag:
tags.append(tag.decode('utf-8'))

Expand All @@ -688,7 +695,7 @@ def test_push(num_tags):

tags = []
for n in range(num_tags):
tag = redis_client.lpop('tags_to_push')
tag = redis_client.lpop('tags_to_push'+"-".join(username.split("_")))
if tag:
tags.append(tag.decode('utf-8'))

Expand Down Expand Up @@ -746,17 +753,17 @@ def create_test_push_job(namespace, quay_host, username, password, concurrency,
)

template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={'quay-perf-test-component': 'executor'}),
metadata=client.V1ObjectMeta(labels={'quay-perf-test-component-push': 'executor-'+"-".join(username.split("_"))}),
spec=client.V1PodSpec(restart_policy='Never', containers=[container])
)

spec = client.V1JobSpec(template=template, backoff_limit=0,
parallelism=concurrency, completions=num_jobs)
parallelism=concurrency, completions=num_jobs, ttl_seconds_after_finished=120)

job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name="test-registry-push"),
metadata=client.V1ObjectMeta(name="test-registry-push"+"-".join(username.split("_"))),
spec=spec
)

Expand Down Expand Up @@ -815,17 +822,17 @@ def create_test_pull_job(namespace, quay_host, username, password, concurrency,
)

template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={'quay-perf-test-component': 'executor'}),
metadata=client.V1ObjectMeta(labels={'quay-perf-test-component-pull': 'executor-'+"-".join(username.split("_"))}),
spec=client.V1PodSpec(restart_policy='Never', containers=[container])
)

spec = client.V1JobSpec(template=template, backoff_limit=0,
parallelism=concurrency, completions=num_jobs)
parallelism=concurrency, completions=num_jobs, ttl_seconds_after_finished=120)

job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name="test-registry-pull"),
metadata=client.V1ObjectMeta(name="test-registry-pull"+"-".join(username.split("_"))),
spec=spec
)

Expand All @@ -840,6 +847,82 @@ def create_test_pull_job(namespace, quay_host, username, password, concurrency,
logger.info("Created Job: %s", resp.metadata.name)


"""
This function is triggered using python multiprocessing to create push/pull jobs in parallel
with input concurrency specified. For example: If we input 10 users with concurrency 5, It will
create 5 push/pull jobs first and 5 push/pull jobs next in batches to process them. It uses
redis to store all the tags to be pushed for each user by appending the unique username at
the end of the tag key which is used as an unique identifier to fetch all the tags to be uploaded
to that specific user's account.

:param user: username
:param kwargs: args required to create jobs
:return: None
"""
def parallel_process(user, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you write a small description on how this function works and how you are using redis etc

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Added the description. Please let me know if you have any further questions on functionality.

common_args = kwargs
# Container Operations
redis_client.delete('tags_to_push'+"-".join(user.split("_"))) # avoid stale data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are the parallel operations only for push/pull?

Copy link
Collaborator Author

@vishnuchalla vishnuchalla Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. For rest of the APIS, we hit endpoints using vegeta. So the concurrency is automatically handled by the --rate option in the tool. Since we execute podman commands for push/pull we have to explicitly implement parallel processing for them as in the above code.

redis_client.rpush('tags_to_push'+"-".join(user.split("_")), *common_args['tags'])
logger.info('Queued %s tags to be created' % len(common_args['tags']))

redis_client.delete('tags_to_pull'+"-".join(user.split("_"))) # avoid stale data
redis_client.rpush('tags_to_pull'+"-".join(user.split("_")), *common_args['tags'])
logger.info('Queued %s tags to be pulled' % len(common_args['tags']))

# Start the Registry Push Test job
create_test_push_job(common_args['namespace'], common_args['quay_host'], user,
common_args['password'], common_args['concurrency'], common_args['uuid'], common_args['auth_token'],
common_args['batch_size'], len(common_args['tags']), common_args['push_pull_image'], common_args['target_hit_size'])
time.sleep(60) # Give the Job time to start
while True:
# Check Job Status
job_name = 'test-registry-push'+"-".join(user.split("_"))
job_api = client.BatchV1Api()
resp = job_api.read_namespaced_job_status(name=job_name, namespace=common_args['namespace'])
completion_time = resp.status.completion_time
if completion_time:
logger.info("Job %s has been completed." % (job_name))
break

# Log Queue Status
remaining = redis_client.llen('tags_to_push'+"-".join(user.split("_")))
logger.info('Waiting for %s to finish. Queue: %s/%s' % (job_name, remaining, len(common_args['tags'])))
time.sleep(60 * 1) # 1 minute

# Start the Registry Pull Test job
create_test_pull_job(common_args['namespace'], common_args['quay_host'], user,
common_args['password'], common_args['concurrency'], common_args['uuid'], common_args['auth_token'],
common_args['batch_size'], len(common_args['tags']), common_args['push_pull_image'], common_args['target_hit_size'])
time.sleep(60) # Give the Job time to start
while True:

# Check Job Status
job_name = 'test-registry-pull'+"-".join(user.split("_"))
job_api = client.BatchV1Api()
resp = job_api.read_namespaced_job_status(name=job_name, namespace=common_args['namespace'])
completion_time = resp.status.completion_time
if completion_time:
logger.info("Job %s has been completed." % (job_name))
break

# Log Queue Status
remaining = redis_client.llen('tags_to_pull'+"-".join(user.split("_")))
logger.info('Waiting for %s to finish. Queue: %s/%s' % (job_name, remaining, len(common_args['tags'])))
time.sleep(60 * 1) # 1 minute


def batch_process(users_chunk, batch_args):
jobs = []
for each_user in users_chunk:
process = mp.Process(target=parallel_process, args=(each_user,), kwargs=batch_args)
jobs.append(process)
process.start()

for proc in jobs:
proc.join()


if __name__ == '__main__':

config.load_incluster_config()
Expand Down Expand Up @@ -959,50 +1042,25 @@ def create_test_pull_job(namespace, quay_host, username, password, concurrency,
add_team_members(organization, teams, users)
add_teams_to_organization_repos(organization, repos, teams)

# Container Operations
redis_client.delete('tags_to_push') # avoid stale data
redis_client.rpush('tags_to_push', *tags)
logger.info('Queued %s tags to be created' % len(tags))

# Start the Registry Push Test job
create_test_push_job(namespace, QUAY_HOST, users[0], password, CONCURRENCY, TEST_UUID, AUTH_TOKEN, BATCH_SIZE, len(tags), PUSH_PULL_IMAGE, TARGET_HIT_SIZE)
time.sleep(60) # Give the Job time to start
while True:

# Check Job Status
job_api = client.BatchV1Api()
resp = job_api.read_namespaced_job_status(name='test-registry-push', namespace=namespace)
completion_time = resp.status.completion_time
if completion_time:
logger.info("Job 'test-registry-push' completed.")
break

# Log Queue Status
remaining = redis_client.llen('tags_to_push')
logger.info('Waiting for "test-registry-push" to finish. Queue: %s/%s' % (remaining, len(tags)))
time.sleep(60 * 5) # 5 minutes

redis_client.delete('tags_to_pull') # avoid stale data
redis_client.rpush('tags_to_pull', *tags)
logger.info('Queued %s tags to be pulled' % len(tags))

# Start the Registry Pull Test job
create_test_pull_job(namespace, QUAY_HOST, users[0], password, CONCURRENCY, TEST_UUID, AUTH_TOKEN, BATCH_SIZE, len(tags), PUSH_PULL_IMAGE, TARGET_HIT_SIZE)
time.sleep(60) # Give the Job time to start
while True:

# Check Job Status
job_api = client.BatchV1Api()
resp = job_api.read_namespaced_job_status(name='test-registry-pull', namespace=namespace)
completion_time = resp.status.completion_time
if completion_time:
logger.info("Job 'test-registry-pull' completed.")
break
batch_args = {
"namespace": namespace,
"quay_host": QUAY_HOST,
"password": password,
"concurrency": CONCURRENCY,
"uuid": TEST_UUID,
"auth_token": AUTH_TOKEN,
"batch_size": BATCH_SIZE,
"tags": tags,
"push_pull_image": PUSH_PULL_IMAGE,
"target_hit_size": TARGET_HIT_SIZE
}

# Log Queue Status
remaining = redis_client.llen('tags_to_pull')
logger.info('Waiting for "test-registry-pull" to finish. Queue: %s/%s' % (remaining, len(tags)))
time.sleep(60 * 5) # 5 minutes
users_copy = users[:]
while len(users_copy) > CONCURRENCY:
users_copy_chunck = users_copy[0: CONCURRENCY]
batch_process(users_copy_chunck, batch_args)
users_copy = users_copy[CONCURRENCY:]
batch_process(users_copy, batch_args)

# These tests should run *after* repositories contain images
get_catalog()
Expand Down