Merge "Update tempest for neutron driver attach/detach security group action"
This commit is contained in:
commit
b1ba5935e1
|
@ -14,6 +14,7 @@
|
|||
# under the License.
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
from tempest import clients
|
||||
from tempest.common import utils
|
||||
from tempest import config
|
||||
|
@ -25,6 +26,7 @@ from congress_tempest_plugin.tests.scenario import helper
|
|||
from congress_tempest_plugin.tests.scenario import manager_congress
|
||||
|
||||
CONF = config.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestNeutronV2Driver(manager_congress.ScenarioPolicyBase):
|
||||
|
@ -390,6 +392,51 @@ class TestNeutronV2Driver(manager_congress.ScenarioPolicyBase):
|
|||
raise exceptions.TimeoutException("Data did not converge in time "
|
||||
"or failure in server")
|
||||
|
||||
@decorators.attr(type='smoke')
|
||||
def test_neutronv2_attach_detach_port_security_group(self):
|
||||
self.network, self.subnet, self.router = self.create_networks()
|
||||
self.check_networks()
|
||||
# first create a test port with exactly 1 (default) security group
|
||||
post_body = {
|
||||
"port_security_enabled": True,
|
||||
"network_id": self.network['id']}
|
||||
body = self.ports_client.create_port(**post_body)
|
||||
test_port = body['port']
|
||||
self.addCleanup(self.ports_client.delete_port, test_port['id'])
|
||||
|
||||
# test detach and re-attach
|
||||
test_policy = self._create_random_policy()
|
||||
|
||||
# use rules to detach group
|
||||
self._create_policy_rule(
|
||||
test_policy,
|
||||
'execute[neutronv2:detach_port_security_group("%s", "%s")] '
|
||||
':- p(1)' % (
|
||||
test_port['id'], test_port['security_groups'][0]))
|
||||
self._create_policy_rule(test_policy, 'p(1)')
|
||||
|
||||
def _check_data(num_sec_grps):
|
||||
updated_port = self.ports_client.show_port(test_port['id'])
|
||||
return len(updated_port['port']['security_groups']) == num_sec_grps
|
||||
|
||||
if not test_utils.call_until_true(func=lambda: _check_data(0),
|
||||
duration=30, sleep_for=1):
|
||||
raise exceptions.TimeoutException("Security group did not detach "
|
||||
"within allotted time.")
|
||||
|
||||
# use rules to attach group
|
||||
self._create_policy_rule(
|
||||
test_policy,
|
||||
'execute[neutronv2:attach_port_security_group("%s", "%s")] '
|
||||
':- p(2)' % (
|
||||
test_port['id'], test_port['security_groups'][0]))
|
||||
self._create_policy_rule(test_policy, 'p(2)')
|
||||
|
||||
if not test_utils.call_until_true(func=lambda: _check_data(1),
|
||||
duration=30, sleep_for=1):
|
||||
raise exceptions.TimeoutException("Security group did not attach "
|
||||
"within allotted time.")
|
||||
|
||||
@decorators.attr(type='smoke')
|
||||
def test_update_no_error(self):
|
||||
if not test_utils.call_until_true(
|
||||
|
|
|
@ -14,7 +14,9 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import collections
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
|
||||
from oslo_log import log as logging
|
||||
from tempest.common import credentials_factory as credentials
|
||||
|
@ -258,3 +260,29 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest):
|
|||
"floating-ip {src}".format(dest=remote_ip,
|
||||
src=floating_ip))
|
||||
raise
|
||||
|
||||
def _create_random_policy(self):
|
||||
policy_name = "nova_%s" % ''.join(random.choice(string.ascii_lowercase)
|
||||
for x in range(10))
|
||||
body = {"name": policy_name}
|
||||
resp = self.os_admin.congress_client.create_policy(body)
|
||||
self.addCleanup(self.os_admin.congress_client.delete_policy,
|
||||
resp['id'])
|
||||
return resp['name']
|
||||
|
||||
def _create_policy_rule(self, policy_name, rule, rule_name=None,
|
||||
comment=None):
|
||||
body = {'rule': rule}
|
||||
if rule_name:
|
||||
body['name'] = rule_name
|
||||
if comment:
|
||||
body['comment'] = comment
|
||||
client = self.os_admin.congress_client
|
||||
response = client.create_policy_rule(policy_name, body)
|
||||
if response:
|
||||
self.addCleanup(client.delete_policy_rule, policy_name,
|
||||
response['id'])
|
||||
return response
|
||||
else:
|
||||
raise Exception('Failed to create policy rule (%s, %s)'
|
||||
% (policy_name, rule))
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
|
||||
from tempest.common import utils
|
||||
|
@ -27,6 +25,7 @@ from tempest.lib import exceptions
|
|||
from congress_tempest_plugin.tests.scenario import helper
|
||||
from congress_tempest_plugin.tests.scenario import manager_congress
|
||||
|
||||
|
||||
CONF = config.CONF
|
||||
|
||||
|
||||
|
@ -47,32 +46,6 @@ class TestPolicyBasicOps(manager_congress.ScenarioPolicyBase):
|
|||
self.keypairs = {}
|
||||
self.servers = []
|
||||
|
||||
def _create_random_policy(self):
|
||||
policy_name = "nova_%s" % ''.join(random.choice(string.ascii_lowercase)
|
||||
for x in range(10))
|
||||
body = {"name": policy_name}
|
||||
resp = self.os_admin.congress_client.create_policy(body)
|
||||
self.addCleanup(self.os_admin.congress_client.delete_policy,
|
||||
resp['id'])
|
||||
return resp['name']
|
||||
|
||||
def _create_policy_rule(self, policy_name, rule, rule_name=None,
|
||||
comment=None):
|
||||
body = {'rule': rule}
|
||||
if rule_name:
|
||||
body['name'] = rule_name
|
||||
if comment:
|
||||
body['comment'] = comment
|
||||
client = self.os_admin.congress_client
|
||||
response = client.create_policy_rule(policy_name, body)
|
||||
if response:
|
||||
self.addCleanup(client.delete_policy_rule, policy_name,
|
||||
response['id'])
|
||||
return response
|
||||
else:
|
||||
raise Exception('Failed to create policy rule (%s, %s)'
|
||||
% (policy_name, rule))
|
||||
|
||||
def _create_test_server(self, name=None):
|
||||
image_ref = CONF.compute.image_ref
|
||||
flavor_ref = CONF.compute.flavor_ref
|
||||
|
|
Loading…
Reference in New Issue