Merge "Update tempest for neutron driver attach/detach security group action"

This commit is contained in:
Zuul 2018-01-09 09:15:22 +00:00 committed by Gerrit Code Review
commit b1ba5935e1
3 changed files with 76 additions and 28 deletions

View File

@ -14,6 +14,7 @@
# under the License.
import time
from oslo_log import log as logging
from tempest import clients
from tempest.common import utils
from tempest import config
@ -25,6 +26,7 @@ from congress_tempest_plugin.tests.scenario import helper
from congress_tempest_plugin.tests.scenario import manager_congress
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestNeutronV2Driver(manager_congress.ScenarioPolicyBase):
@ -390,6 +392,51 @@ class TestNeutronV2Driver(manager_congress.ScenarioPolicyBase):
raise exceptions.TimeoutException("Data did not converge in time "
"or failure in server")
@decorators.attr(type='smoke')
def test_neutronv2_attach_detach_port_security_group(self):
self.network, self.subnet, self.router = self.create_networks()
self.check_networks()
# first create a test port with exactly 1 (default) security group
post_body = {
"port_security_enabled": True,
"network_id": self.network['id']}
body = self.ports_client.create_port(**post_body)
test_port = body['port']
self.addCleanup(self.ports_client.delete_port, test_port['id'])
# test detach and re-attach
test_policy = self._create_random_policy()
# use rules to detach group
self._create_policy_rule(
test_policy,
'execute[neutronv2:detach_port_security_group("%s", "%s")] '
':- p(1)' % (
test_port['id'], test_port['security_groups'][0]))
self._create_policy_rule(test_policy, 'p(1)')
def _check_data(num_sec_grps):
updated_port = self.ports_client.show_port(test_port['id'])
return len(updated_port['port']['security_groups']) == num_sec_grps
if not test_utils.call_until_true(func=lambda: _check_data(0),
duration=30, sleep_for=1):
raise exceptions.TimeoutException("Security group did not detach "
"within allotted time.")
# use rules to attach group
self._create_policy_rule(
test_policy,
'execute[neutronv2:attach_port_security_group("%s", "%s")] '
':- p(2)' % (
test_port['id'], test_port['security_groups'][0]))
self._create_policy_rule(test_policy, 'p(2)')
if not test_utils.call_until_true(func=lambda: _check_data(1),
duration=30, sleep_for=1):
raise exceptions.TimeoutException("Security group did not attach "
"within allotted time.")
@decorators.attr(type='smoke')
def test_update_no_error(self):
if not test_utils.call_until_true(

View File

@ -14,7 +14,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import random
import re
import string
from oslo_log import log as logging
from tempest.common import credentials_factory as credentials
@ -258,3 +260,29 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest):
"floating-ip {src}".format(dest=remote_ip,
src=floating_ip))
raise
def _create_random_policy(self):
policy_name = "nova_%s" % ''.join(random.choice(string.ascii_lowercase)
for x in range(10))
body = {"name": policy_name}
resp = self.os_admin.congress_client.create_policy(body)
self.addCleanup(self.os_admin.congress_client.delete_policy,
resp['id'])
return resp['name']
def _create_policy_rule(self, policy_name, rule, rule_name=None,
comment=None):
body = {'rule': rule}
if rule_name:
body['name'] = rule_name
if comment:
body['comment'] = comment
client = self.os_admin.congress_client
response = client.create_policy_rule(policy_name, body)
if response:
self.addCleanup(client.delete_policy_rule, policy_name,
response['id'])
return response
else:
raise Exception('Failed to create policy rule (%s, %s)'
% (policy_name, rule))

View File

@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import random
import string
import time
from tempest.common import utils
@ -27,6 +25,7 @@ from tempest.lib import exceptions
from congress_tempest_plugin.tests.scenario import helper
from congress_tempest_plugin.tests.scenario import manager_congress
CONF = config.CONF
@ -47,32 +46,6 @@ class TestPolicyBasicOps(manager_congress.ScenarioPolicyBase):
self.keypairs = {}
self.servers = []
def _create_random_policy(self):
policy_name = "nova_%s" % ''.join(random.choice(string.ascii_lowercase)
for x in range(10))
body = {"name": policy_name}
resp = self.os_admin.congress_client.create_policy(body)
self.addCleanup(self.os_admin.congress_client.delete_policy,
resp['id'])
return resp['name']
def _create_policy_rule(self, policy_name, rule, rule_name=None,
comment=None):
body = {'rule': rule}
if rule_name:
body['name'] = rule_name
if comment:
body['comment'] = comment
client = self.os_admin.congress_client
response = client.create_policy_rule(policy_name, body)
if response:
self.addCleanup(client.delete_policy_rule, policy_name,
response['id'])
return response
else:
raise Exception('Failed to create policy rule (%s, %s)'
% (policy_name, rule))
def _create_test_server(self, name=None):
image_ref = CONF.compute.image_ref
flavor_ref = CONF.compute.flavor_ref