Add delete method for security group rules

Adds new API method for deleting a security group rule.

This conforms to the newly agreed upon pattern of returning True
if the resource was deleted, or False if it was not found and
could not be deleted.

Change-Id: I9c66aee078fa999a85336f600f086b93640ca96e
This commit is contained in:
David Shrewsbury 2015-06-15 15:51:47 -04:00
parent 36ffb13f28
commit f6c0f808fa
3 changed files with 112 additions and 0 deletions

View File

@ -32,6 +32,7 @@ from keystoneclient import exceptions as keystone_exceptions
from keystoneclient import session as ksc_session
from novaclient import client as nova_client
from novaclient import exceptions as nova_exceptions
from neutronclient.common import exceptions as neutron_exceptions
from neutronclient.v2_0 import client as neutron_client
import os_client_config
import os_client_config.defaults
@ -2704,6 +2705,59 @@ class OpenStackCloud(object):
"Unavailable feature: security groups"
)
def delete_security_group_rule(self, rule_id):
"""Delete a security group rule
:param string rule_id: The unique ID of the security group rule.
:returns: True if delete succeeded, False otherwise.
:raises: OpenStackCloudException on operation error.
:raises: OpenStackCloudUnavailableFeature if security groups are
not supported on this cloud.
"""
if self.secgroup_source == 'neutron':
try:
self.manager.submitTask(
_tasks.NeutronSecurityGroupRuleDelete(
security_group_rule=rule_id)
)
except neutron_exceptions.NotFound:
return False
except Exception as e:
self.log.debug(
"neutron failed to delete security group rule {id}".format(
id=rule_id),
exc_info=True)
raise OpenStackCloudException(
"failed to delete security group rule {id}: {msg}".format(
id=rule_id, msg=str(e)))
return True
elif self.secgroup_source == 'nova':
try:
self.manager.submitTask(
_tasks.NovaSecurityGroupRuleDelete(rule=rule_id)
)
except nova_exceptions.NotFound:
return False
except Exception as e:
self.log.debug(
"nova failed to delete security group rule {id}".format(
id=rule_id),
exc_info=True)
raise OpenStackCloudException(
"failed to delete security group rule {id}: {msg}".format(
id=rule_id, msg=str(e)))
return True
# Security groups not supported
else:
raise OpenStackCloudUnavailableFeature(
"Unavailable feature: security groups"
)
class OperatorCloud(OpenStackCloud):
"""Represent a privileged/operator connection to an OpenStack Cloud.

View File

@ -217,6 +217,11 @@ class NeutronSecurityGroupRuleCreate(task_manager.Task):
return client.neutron_client.create_security_group_rule(**self.args)
class NeutronSecurityGroupRuleDelete(task_manager.Task):
def main(self, client):
return client.neutron_client.delete_security_group_rule(**self.args)
class NovaSecurityGroupList(task_manager.Task):
def main(self, client):
return client.nova_client.security_groups.list()
@ -242,6 +247,11 @@ class NovaSecurityGroupRuleCreate(task_manager.Task):
return client.nova_client.security_group_rules.create(**self.args)
class NovaSecurityGroupRuleDelete(task_manager.Task):
def main(self, client):
return client.nova_client.security_group_rules.delete(**self.args)
# TODO: Do this with neutron instead of nova if possible
class FloatingIPList(task_manager.Task):
def main(self, client):

View File

@ -16,6 +16,9 @@
import copy
import mock
from novaclient import exceptions as nova_exc
from neutronclient.common import exceptions as neutron_exc
import shade
from shade import meta
from shade.tests.unit import base
@ -255,3 +258,48 @@ class TestSecurityGroups(base.TestCase):
'')
self.assertFalse(mock_neutron.create_security_group.called)
self.assertFalse(mock_nova.security_groups.create.called)
@mock.patch.object(shade.OpenStackCloud, 'neutron_client')
def test_delete_security_group_rule_neutron(self, mock_neutron):
self.cloud.secgroup_source = 'neutron'
r = self.cloud.delete_security_group_rule('xyz')
mock_neutron.delete_security_group_rule.assert_called_once_with(
security_group_rule='xyz')
self.assertTrue(r)
@mock.patch.object(shade.OpenStackCloud, 'nova_client')
def test_delete_security_group_rule_nova(self, mock_nova):
self.cloud.secgroup_source = 'nova'
r = self.cloud.delete_security_group_rule('xyz')
mock_nova.security_group_rules.delete.assert_called_once_with(
rule='xyz')
self.assertTrue(r)
@mock.patch.object(shade.OpenStackCloud, 'neutron_client')
@mock.patch.object(shade.OpenStackCloud, 'nova_client')
def test_delete_security_group_rule_none(self, mock_nova, mock_neutron):
self.cloud.secgroup_source = None
self.assertRaises(shade.OpenStackCloudUnavailableFeature,
self.cloud.delete_security_group_rule,
'')
self.assertFalse(mock_neutron.create_security_group.called)
self.assertFalse(mock_nova.security_groups.create.called)
@mock.patch.object(shade.OpenStackCloud, 'neutron_client')
@mock.patch.object(shade.OpenStackCloud, 'nova_client')
def test_delete_security_group_rule_not_found(self,
mock_nova,
mock_neutron):
self.cloud.secgroup_source = 'neutron'
mock_neutron.delete_security_group_rule.side_effect = (
neutron_exc.NotFound()
)
r = self.cloud.delete_security_group('doesNotExist')
self.assertFalse(r)
self.cloud.secgroup_source = 'nova'
mock_neutron.security_group_rules.delete.side_effect = (
nova_exc.NotFound("uh oh")
)
r = self.cloud.delete_security_group('doesNotExist')
self.assertFalse(r)