Update tempest for neutron driver attach/detach security group action
Depends-On: I51ec7452a5a7a77cc8cfc57b87e671685280ea73 Change-Id: I693da5f6f8f67de62daee24a5c7516abf1599a10
This commit is contained in:
parent
0cc1716003
commit
640b7b8f90
|
@ -14,6 +14,7 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from oslo_log import log as logging
|
||||||
from tempest import clients
|
from tempest import clients
|
||||||
from tempest.common import utils
|
from tempest.common import utils
|
||||||
from tempest import config
|
from tempest import config
|
||||||
|
@ -25,6 +26,7 @@ from congress_tempest_plugin.tests.scenario import helper
|
||||||
from congress_tempest_plugin.tests.scenario import manager_congress
|
from congress_tempest_plugin.tests.scenario import manager_congress
|
||||||
|
|
||||||
CONF = config.CONF
|
CONF = config.CONF
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class TestNeutronV2Driver(manager_congress.ScenarioPolicyBase):
|
class TestNeutronV2Driver(manager_congress.ScenarioPolicyBase):
|
||||||
|
@ -390,6 +392,51 @@ class TestNeutronV2Driver(manager_congress.ScenarioPolicyBase):
|
||||||
raise exceptions.TimeoutException("Data did not converge in time "
|
raise exceptions.TimeoutException("Data did not converge in time "
|
||||||
"or failure in server")
|
"or failure in server")
|
||||||
|
|
||||||
|
@decorators.attr(type='smoke')
|
||||||
|
def test_neutronv2_attach_detach_port_security_group(self):
|
||||||
|
self.network, self.subnet, self.router = self.create_networks()
|
||||||
|
self.check_networks()
|
||||||
|
# first create a test port with exactly 1 (default) security group
|
||||||
|
post_body = {
|
||||||
|
"port_security_enabled": True,
|
||||||
|
"network_id": self.network['id']}
|
||||||
|
body = self.ports_client.create_port(**post_body)
|
||||||
|
test_port = body['port']
|
||||||
|
self.addCleanup(self.ports_client.delete_port, test_port['id'])
|
||||||
|
|
||||||
|
# test detach and re-attach
|
||||||
|
test_policy = self._create_random_policy()
|
||||||
|
|
||||||
|
# use rules to detach group
|
||||||
|
self._create_policy_rule(
|
||||||
|
test_policy,
|
||||||
|
'execute[neutronv2:detach_port_security_group("%s", "%s")] '
|
||||||
|
':- p(1)' % (
|
||||||
|
test_port['id'], test_port['security_groups'][0]))
|
||||||
|
self._create_policy_rule(test_policy, 'p(1)')
|
||||||
|
|
||||||
|
def _check_data(num_sec_grps):
|
||||||
|
updated_port = self.ports_client.show_port(test_port['id'])
|
||||||
|
return len(updated_port['port']['security_groups']) == num_sec_grps
|
||||||
|
|
||||||
|
if not test_utils.call_until_true(func=lambda: _check_data(0),
|
||||||
|
duration=30, sleep_for=1):
|
||||||
|
raise exceptions.TimeoutException("Security group did not detach "
|
||||||
|
"within allotted time.")
|
||||||
|
|
||||||
|
# use rules to attach group
|
||||||
|
self._create_policy_rule(
|
||||||
|
test_policy,
|
||||||
|
'execute[neutronv2:attach_port_security_group("%s", "%s")] '
|
||||||
|
':- p(2)' % (
|
||||||
|
test_port['id'], test_port['security_groups'][0]))
|
||||||
|
self._create_policy_rule(test_policy, 'p(2)')
|
||||||
|
|
||||||
|
if not test_utils.call_until_true(func=lambda: _check_data(1),
|
||||||
|
duration=30, sleep_for=1):
|
||||||
|
raise exceptions.TimeoutException("Security group did not attach "
|
||||||
|
"within allotted time.")
|
||||||
|
|
||||||
@decorators.attr(type='smoke')
|
@decorators.attr(type='smoke')
|
||||||
def test_update_no_error(self):
|
def test_update_no_error(self):
|
||||||
if not test_utils.call_until_true(
|
if not test_utils.call_until_true(
|
||||||
|
|
|
@ -14,7 +14,9 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
import collections
|
import collections
|
||||||
|
import random
|
||||||
import re
|
import re
|
||||||
|
import string
|
||||||
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from tempest.common import credentials_factory as credentials
|
from tempest.common import credentials_factory as credentials
|
||||||
|
@ -258,3 +260,29 @@ class ScenarioPolicyBase(manager.NetworkScenarioTest):
|
||||||
"floating-ip {src}".format(dest=remote_ip,
|
"floating-ip {src}".format(dest=remote_ip,
|
||||||
src=floating_ip))
|
src=floating_ip))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
def _create_random_policy(self):
|
||||||
|
policy_name = "nova_%s" % ''.join(random.choice(string.ascii_lowercase)
|
||||||
|
for x in range(10))
|
||||||
|
body = {"name": policy_name}
|
||||||
|
resp = self.os_admin.congress_client.create_policy(body)
|
||||||
|
self.addCleanup(self.os_admin.congress_client.delete_policy,
|
||||||
|
resp['id'])
|
||||||
|
return resp['name']
|
||||||
|
|
||||||
|
def _create_policy_rule(self, policy_name, rule, rule_name=None,
|
||||||
|
comment=None):
|
||||||
|
body = {'rule': rule}
|
||||||
|
if rule_name:
|
||||||
|
body['name'] = rule_name
|
||||||
|
if comment:
|
||||||
|
body['comment'] = comment
|
||||||
|
client = self.os_admin.congress_client
|
||||||
|
response = client.create_policy_rule(policy_name, body)
|
||||||
|
if response:
|
||||||
|
self.addCleanup(client.delete_policy_rule, policy_name,
|
||||||
|
response['id'])
|
||||||
|
return response
|
||||||
|
else:
|
||||||
|
raise Exception('Failed to create policy rule (%s, %s)'
|
||||||
|
% (policy_name, rule))
|
||||||
|
|
|
@ -14,8 +14,6 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import random
|
|
||||||
import string
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from tempest.common import utils
|
from tempest.common import utils
|
||||||
|
@ -27,6 +25,7 @@ from tempest.lib import exceptions
|
||||||
from congress_tempest_plugin.tests.scenario import helper
|
from congress_tempest_plugin.tests.scenario import helper
|
||||||
from congress_tempest_plugin.tests.scenario import manager_congress
|
from congress_tempest_plugin.tests.scenario import manager_congress
|
||||||
|
|
||||||
|
|
||||||
CONF = config.CONF
|
CONF = config.CONF
|
||||||
|
|
||||||
|
|
||||||
|
@ -47,32 +46,6 @@ class TestPolicyBasicOps(manager_congress.ScenarioPolicyBase):
|
||||||
self.keypairs = {}
|
self.keypairs = {}
|
||||||
self.servers = []
|
self.servers = []
|
||||||
|
|
||||||
def _create_random_policy(self):
|
|
||||||
policy_name = "nova_%s" % ''.join(random.choice(string.ascii_lowercase)
|
|
||||||
for x in range(10))
|
|
||||||
body = {"name": policy_name}
|
|
||||||
resp = self.os_admin.congress_client.create_policy(body)
|
|
||||||
self.addCleanup(self.os_admin.congress_client.delete_policy,
|
|
||||||
resp['id'])
|
|
||||||
return resp['name']
|
|
||||||
|
|
||||||
def _create_policy_rule(self, policy_name, rule, rule_name=None,
|
|
||||||
comment=None):
|
|
||||||
body = {'rule': rule}
|
|
||||||
if rule_name:
|
|
||||||
body['name'] = rule_name
|
|
||||||
if comment:
|
|
||||||
body['comment'] = comment
|
|
||||||
client = self.os_admin.congress_client
|
|
||||||
response = client.create_policy_rule(policy_name, body)
|
|
||||||
if response:
|
|
||||||
self.addCleanup(client.delete_policy_rule, policy_name,
|
|
||||||
response['id'])
|
|
||||||
return response
|
|
||||||
else:
|
|
||||||
raise Exception('Failed to create policy rule (%s, %s)'
|
|
||||||
% (policy_name, rule))
|
|
||||||
|
|
||||||
def _create_test_server(self, name=None):
|
def _create_test_server(self, name=None):
|
||||||
image_ref = CONF.compute.image_ref
|
image_ref = CONF.compute.image_ref
|
||||||
flavor_ref = CONF.compute.flavor_ref
|
flavor_ref = CONF.compute.flavor_ref
|
||||||
|
|
Loading…
Reference in New Issue