diff --git a/congress_tempest_plugin/tests/scenario/congress_datasources/test_neutronv2.py b/congress_tempest_plugin/tests/scenario/congress_datasources/test_neutronv2.py index a741166..712953e 100644 --- a/congress_tempest_plugin/tests/scenario/congress_datasources/test_neutronv2.py +++ b/congress_tempest_plugin/tests/scenario/congress_datasources/test_neutronv2.py @@ -14,6 +14,7 @@ # under the License. import time +from oslo_log import log as logging from tempest import clients from tempest.common import utils from tempest import config @@ -25,6 +26,7 @@ from congress_tempest_plugin.tests.scenario import helper from congress_tempest_plugin.tests.scenario import manager_congress CONF = config.CONF +LOG = logging.getLogger(__name__) class TestNeutronV2Driver(manager_congress.ScenarioPolicyBase): @@ -390,6 +392,51 @@ class TestNeutronV2Driver(manager_congress.ScenarioPolicyBase): raise exceptions.TimeoutException("Data did not converge in time " "or failure in server") + @decorators.attr(type='smoke') + def test_neutronv2_attach_detach_port_security_group(self): + self.network, self.subnet, self.router = self.create_networks() + self.check_networks() + # first create a test port with exactly 1 (default) security group + post_body = { + "port_security_enabled": True, + "network_id": self.network['id']} + body = self.ports_client.create_port(**post_body) + test_port = body['port'] + self.addCleanup(self.ports_client.delete_port, test_port['id']) + + # test detach and re-attach + test_policy = self._create_random_policy() + + # use rules to detach group + self._create_policy_rule( + test_policy, + 'execute[neutronv2:detach_port_security_group("%s", "%s")] ' + ':- p(1)' % ( + test_port['id'], test_port['security_groups'][0])) + self._create_policy_rule(test_policy, 'p(1)') + + def _check_data(num_sec_grps): + updated_port = self.ports_client.show_port(test_port['id']) + return len(updated_port['port']['security_groups']) == num_sec_grps + + if not test_utils.call_until_true(func=lambda: _check_data(0), + duration=30, sleep_for=1): + raise exceptions.TimeoutException("Security group did not detach " + "within allotted time.") + + # use rules to attach group + self._create_policy_rule( + test_policy, + 'execute[neutronv2:attach_port_security_group("%s", "%s")] ' + ':- p(2)' % ( + test_port['id'], test_port['security_groups'][0])) + self._create_policy_rule(test_policy, 'p(2)') + + if not test_utils.call_until_true(func=lambda: _check_data(1), + duration=30, sleep_for=1): + raise exceptions.TimeoutException("Security group did not attach " + "within allotted time.") + @decorators.attr(type='smoke') def test_update_no_error(self): if not test_utils.call_until_true( diff --git a/congress_tempest_plugin/tests/scenario/manager_congress.py b/congress_tempest_plugin/tests/scenario/manager_congress.py index 52ca8c0..e43b0a7 100755 --- a/congress_tempest_plugin/tests/scenario/manager_congress.py +++ b/congress_tempest_plugin/tests/scenario/manager_congress.py @@ -14,7 +14,9 @@ # License for the specific language governing permissions and limitations # under the License. import collections +import random import re +import string from oslo_log import log as logging from tempest.common import credentials_factory as credentials @@ -258,3 +260,29 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest): "floating-ip {src}".format(dest=remote_ip, src=floating_ip)) raise + + def _create_random_policy(self): + policy_name = "nova_%s" % ''.join(random.choice(string.ascii_lowercase) + for x in range(10)) + body = {"name": policy_name} + resp = self.os_admin.congress_client.create_policy(body) + self.addCleanup(self.os_admin.congress_client.delete_policy, + resp['id']) + return resp['name'] + + def _create_policy_rule(self, policy_name, rule, rule_name=None, + comment=None): + body = {'rule': rule} + if rule_name: + body['name'] = rule_name + if comment: + body['comment'] = comment + client = self.os_admin.congress_client + response = client.create_policy_rule(policy_name, body) + if response: + self.addCleanup(client.delete_policy_rule, policy_name, + response['id']) + return response + else: + raise Exception('Failed to create policy rule (%s, %s)' + % (policy_name, rule)) diff --git a/congress_tempest_plugin/tests/scenario/test_congress_basic_ops.py b/congress_tempest_plugin/tests/scenario/test_congress_basic_ops.py index 6de5604..065bc07 100644 --- a/congress_tempest_plugin/tests/scenario/test_congress_basic_ops.py +++ b/congress_tempest_plugin/tests/scenario/test_congress_basic_ops.py @@ -14,8 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import random -import string import time from tempest.common import utils @@ -27,6 +25,7 @@ from tempest.lib import exceptions from congress_tempest_plugin.tests.scenario import helper from congress_tempest_plugin.tests.scenario import manager_congress + CONF = config.CONF @@ -47,32 +46,6 @@ class TestPolicyBasicOps(manager_congress.ScenarioPolicyBase): self.keypairs = {} self.servers = [] - def _create_random_policy(self): - policy_name = "nova_%s" % ''.join(random.choice(string.ascii_lowercase) - for x in range(10)) - body = {"name": policy_name} - resp = self.os_admin.congress_client.create_policy(body) - self.addCleanup(self.os_admin.congress_client.delete_policy, - resp['id']) - return resp['name'] - - def _create_policy_rule(self, policy_name, rule, rule_name=None, - comment=None): - body = {'rule': rule} - if rule_name: - body['name'] = rule_name - if comment: - body['comment'] = comment - client = self.os_admin.congress_client - response = client.create_policy_rule(policy_name, body) - if response: - self.addCleanup(client.delete_policy_rule, policy_name, - response['id']) - return response - else: - raise Exception('Failed to create policy rule (%s, %s)' - % (policy_name, rule)) - def _create_test_server(self, name=None): image_ref = CONF.compute.image_ref flavor_ref = CONF.compute.flavor_ref