Remove pdb
Change-Id: Ia4b24e6272422c47c48bd75f0bcdb8e0a2de48cc More functional tests Change-Id: I5bb9115199e446fe9e6dbc27ac53efa6dbb06939
This commit is contained in:
parent
4d6c183de0
commit
7d7cc5bf90
|
@ -14,6 +14,8 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
import json
|
||||
|
||||
from cerberus.tests.functional import base
|
||||
|
||||
|
||||
|
@ -22,7 +24,8 @@ class AlarmTestsV1(base.TestCase):
|
|||
_service = 'security'
|
||||
|
||||
def test_list_alarms(self):
|
||||
resp, body = self.security_client.get("v1/security_alarms")
|
||||
resp, body = self.security_client.get(
|
||||
self.security_client._version + '/security_alarms')
|
||||
self.assertEqual(200, resp.status)
|
||||
|
||||
|
||||
|
@ -31,7 +34,8 @@ class ReportTestsV1(base.TestCase):
|
|||
_service = 'security'
|
||||
|
||||
def test_list_reports(self):
|
||||
resp, body = self.security_client.get("v1/security_reports")
|
||||
resp, body = self.security_client.get(
|
||||
self.security_client._version + '/security_reports')
|
||||
self.assertEqual(200, resp.status)
|
||||
|
||||
|
||||
|
@ -40,14 +44,79 @@ class TaskTestsV1(base.TestCase):
|
|||
_service = 'security'
|
||||
|
||||
def test_list_tasks(self):
|
||||
resp, body = self.security_client.get("v1/tasks")
|
||||
resp, body = self.security_client.get(
|
||||
self.security_client._version + '/tasks')
|
||||
self.assertEqual(200, resp.status)
|
||||
|
||||
def test_create_unique_task_not_persistent(self):
|
||||
plugin_id = None
|
||||
resp, body = self.security_client.get(
|
||||
self.security_client._version + '/plugins',
|
||||
)
|
||||
plugins = json.loads(body).get('plugins', None)
|
||||
if plugins is not None:
|
||||
for plugin in plugins:
|
||||
if (plugin.get('name', None) ==
|
||||
'cerberus.plugins.test_plugin.TestPlugin'):
|
||||
plugin_id = plugin.get('uuid', None)
|
||||
|
||||
self.assertIsNotNone(plugin_id,
|
||||
message='cerberus.plugins.test_plugin.TestPlugin '
|
||||
'must exist and have an id')
|
||||
|
||||
task = {
|
||||
"name": "unique_task_np",
|
||||
"method": "act_short",
|
||||
"plugin_id": plugin_id,
|
||||
"type": "unique"
|
||||
}
|
||||
headers = {'content-type': 'application/json'}
|
||||
resp, body = self.security_client.post(
|
||||
self.security_client._version + '/tasks', json.dumps(task),
|
||||
headers=headers)
|
||||
self.assertEqual(200, resp.status)
|
||||
|
||||
def test_create_recurrent_task_not_persistent(self):
|
||||
plugin_id = None
|
||||
resp, body = self.security_client.get(
|
||||
self.security_client._version + '/plugins',
|
||||
)
|
||||
plugins = json.loads(body).get('plugins', None)
|
||||
if plugins is not None:
|
||||
for plugin in plugins:
|
||||
if (plugin.get('name', None) ==
|
||||
'cerberus.plugins.test_plugin.TestPlugin'):
|
||||
plugin_id = plugin.get('uuid', None)
|
||||
|
||||
self.assertIsNotNone(plugin_id,
|
||||
message='cerberus.plugins.test_plugin.TestPlugin '
|
||||
'must exist and have an id')
|
||||
|
||||
task = {
|
||||
"name": "recurrent_task_np",
|
||||
"method": "act_short",
|
||||
"plugin_id": plugin_id,
|
||||
"type": "recurrent",
|
||||
"period": 3
|
||||
}
|
||||
headers = {'content-type': 'application/json'}
|
||||
resp, body = self.security_client.post(
|
||||
self.security_client._version + '/tasks', json.dumps(task),
|
||||
headers=headers)
|
||||
task_id = json.loads(body).get('id', None)
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertIsNotNone(task_id)
|
||||
resp, body = self.security_client.delete(
|
||||
self.security_client._version + '/tasks/' + task_id,
|
||||
)
|
||||
self.assertEqual(204, resp.status)
|
||||
|
||||
|
||||
class PluginTestsV1(base.TestCase):
|
||||
|
||||
_service = 'security'
|
||||
|
||||
def test_list_plugins(self):
|
||||
resp, body = self.security_client.get("v1/plugins")
|
||||
resp, body = self.security_client.get(
|
||||
self.security_client._version + '/plugins')
|
||||
self.assertEqual(200, resp.status)
|
||||
|
|
|
@ -74,8 +74,9 @@ class CerberusClientBase(rest_client.RestClient):
|
|||
|
||||
class CerberusClientV1(CerberusClientBase):
|
||||
|
||||
def list(self):
|
||||
self.get("http://127.0.0.1:8300/v1")
|
||||
def __init__(self, auth_provider, service_type):
|
||||
super(CerberusClientV1, self).__init__(auth_provider, service_type)
|
||||
self._version = 'v1'
|
||||
|
||||
|
||||
class AuthProv(auth.KeystoneV2AuthProvider):
|
||||
|
|
|
@ -34,36 +34,30 @@ class NotificationTests(base.TestCase):
|
|||
'bare',
|
||||
'iso',
|
||||
visibility='private')
|
||||
|
||||
# Remove image at the end
|
||||
self.addCleanup(self.mgr.image_client.delete_image, glance_resp['id'])
|
||||
self.assertEqual('queued', glance_resp['status'])
|
||||
image_id = glance_resp['id']
|
||||
|
||||
# Remove image at the end
|
||||
self.addCleanup(self.mgr.image_client.delete_image, glance_resp['id'])
|
||||
|
||||
# Check how many tasks there are at the beginning
|
||||
resp, task_list_0 = self.security_client.get("v1/tasks")
|
||||
task_list_0 = json.loads(task_list_0)
|
||||
|
||||
# Update Image
|
||||
self.mgr.image_client.update_image(image_id,
|
||||
'START')
|
||||
self.mgr.image_client.update_image(image_id, 'START')
|
||||
|
||||
# Verifying task has been created
|
||||
resp, task_list_1 = self.security_client.get("v1/tasks")
|
||||
task_list_1 = json.loads(task_list_1)
|
||||
|
||||
import pdb
|
||||
pdb.set_trace()
|
||||
|
||||
self.assertEqual(len(task_list_0.get('tasks', 0)) + 1,
|
||||
len(task_list_1.get('tasks', 0)))
|
||||
|
||||
# Update Image
|
||||
self.mgr.image_client.update_image(image_id,
|
||||
'STOP')
|
||||
self.mgr.image_client.update_image(image_id, 'STOP')
|
||||
|
||||
# Verifying task has been created
|
||||
# Verify task has been created
|
||||
resp, task_list_2 = self.security_client.get("v1/tasks")
|
||||
task_list_2 = json.loads(task_list_2)
|
||||
|
||||
self.assertEqual(len(task_list_1.get('tasks', 0)) - 1,
|
||||
len(task_list_2.get('tasks', 0)))
|
||||
|
|
Loading…
Reference in New Issue