Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add unittest for destroy cluster command
Add a unittest for refactored function to destroy cluster.  Relies on
mock and moto dependencies to avoid sending out EC2 requests.
  • Loading branch information
Luis Osa committed Mar 13, 2014
commit 2e7ca995dce6a8822fed4389be4d69223bd671d2
109 changes: 57 additions & 52 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
from boto import ec2

AWS_EVENTUAL_CONSISTENCY = 30

class UsageError(Exception):
pass

Expand Down Expand Up @@ -223,7 +225,7 @@ def launch_cluster(conn, opts, cluster_name):
sys.exit(1)
if opts.key_pair is None:
print >> stderr, "ERROR: Must provide a key pair name (-k) to use on instances."
sys.exit(1)
sys.exit(1)
print "Setting up security groups..."
master_group = get_or_make_group(conn, cluster_name + "-master")
slave_group = get_or_make_group(conn, cluster_name + "-slaves")
Expand Down Expand Up @@ -459,7 +461,6 @@ def setup_spark_cluster(master, opts):
if opts.ganglia:
print "Ganglia started at http://%s:5080/ganglia" % master


# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
print "Waiting for instances to start up..."
Expand Down Expand Up @@ -665,6 +666,59 @@ def get_partition(total, num_partitions, current_partitions):
return num_slaves_this_zone


def destroy_cluster(conn, opts, cluster_name):
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
print "Terminating master..."
for inst in master_nodes:
inst.terminate()
print "Terminating slaves..."
for inst in slave_nodes:
inst.terminate()

# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves"]

attempt = 1;
while attempt <= 3:
print "Attempt %d" % attempt
groups = [g for g in conn.get_all_security_groups() if g.name in group_names]
success = True
# Delete individual rules in all groups before deleting groups to
# remove dependencies between them
for group in groups:
print "Deleting rules in security group " + group.name
for rule in group.rules:
for grant in rule.grants:
success &= group.revoke(ip_protocol=rule.ip_protocol,
from_port=rule.from_port,
to_port=rule.to_port,
src_group=grant)

# Sleep for AWS eventual-consistency to catch up, and for instances
# to terminate
time.sleep(AWS_EVENTUAL_CONSISTENCY) # Yes, it does have to be this long :-(
for group in groups:
try:
conn.delete_security_group(group.name)
print "Deleted security group " + group.name
except boto.exception.EC2ResponseError:
success = False;
print "Failed to delete security group " + group.name

# Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails
if success: break;

attempt += 1

if not success:
print "Failed to delete all security groups after 3 tries."
print "Try re-running in a few minutes."


def real_main():
(opts, action, cluster_name) = parse_args()
try:
Expand Down Expand Up @@ -695,56 +749,7 @@ def real_main():
cluster_name + "?\nALL DATA ON ALL NODES WILL BE LOST!!\n" +
"Destroy cluster " + cluster_name + " (y/N): ")
if response == "y":
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
print "Terminating master..."
for inst in master_nodes:
inst.terminate()
print "Terminating slaves..."
for inst in slave_nodes:
inst.terminate()

# Delete security groups as well
if opts.delete_groups:
print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves"]

attempt = 1;
while attempt <= 3:
print "Attempt %d" % attempt
groups = [g for g in conn.get_all_security_groups() if g.name in group_names]
success = True
# Delete individual rules in all groups before deleting groups to
# remove dependencies between them
for group in groups:
print "Deleting rules in security group " + group.name
for rule in group.rules:
for grant in rule.grants:
success &= group.revoke(ip_protocol=rule.ip_protocol,
from_port=rule.from_port,
to_port=rule.to_port,
src_group=grant)

# Sleep for AWS eventual-consistency to catch up, and for instances
# to terminate
time.sleep(30) # Yes, it does have to be this long :-(
for group in groups:
try:
conn.delete_security_group(group.name)
print "Deleted security group " + group.name
except boto.exception.EC2ResponseError:
success = False;
print "Failed to delete security group " + group.name

# Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails
if success: break;

attempt += 1

if not success:
print "Failed to delete all security groups after 3 tries."
print "Try re-running in a few minutes."
destroy_cluster()

elif action == "login":
(master_nodes, slave_nodes) = get_existing_cluster(
Expand Down
25 changes: 25 additions & 0 deletions ec2/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import unittest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably include

#!/usr/bin/env python
# -*- coding: utf-8 -*-

and definitely include the Apache license header

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


from boto import ec2
import mock
import moto

import spark_ec2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra newline

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra newline before classes is required by the flake8 tool in its default settings, which is what I am using to review Python style for this script. If you tell me this is not your enforced style, then I will change it. However, if you do not have any enforced style, please allow me to keep it and suggest flake8 defaults as the style standard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I check code in pyspark as well and we do follow the flake8 defaults of extra newline before classes. So keep this in is absolutely fine.


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add some class level comments and instructions on how to run this test here ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

class CommandTests(unittest.TestCase):

@moto.mock_ec2
def test_destroy(self):
spark_ec2.AWS_EVENTUAL_CONSISTENCY = 1
opts = mock.MagicMock(name='opts')
opts.region = "us-east-1"
conn = ec2.connect_to_region(opts.region)
cluster_name = "cluster_name"
try:
spark_ec2.destroy_cluster(conn, opts, cluster_name)
except:
self.fail("destroy_cluster raised unexpected exception")

if __name__ == '__main__':
unittest.main()