Functional test: notification with image.update

Test if notification is working thanks to test_plugin which subscribed
to image.update notifications.

Change-Id: I46fa1bf0674e5d4f9ccb6cc6f3e67eb443d0c978
This commit is contained in:
Romain Ziba 2015-05-29 11:27:34 +02:00
parent 18bfed36de
commit 4d6c183de0
4 changed files with 82 additions and 9 deletions

View File

@ -161,11 +161,15 @@ class TestPlugin(base.PluginBase):
'event': event_type,
'payload': payload})
if ('START' in payload['name']and self.task_id is None):
self.task_id = self.manager.\
_add_recurrent_task(self.act_short,
1,
task_name='TEST_PLUGIN_START_PAYLOAD')
self.task_id = self.manager.create_task(
{},
self._uuid,
'act_short',
task_type='recurrent',
task_period=1,
task_name='TEST_PLUGIN_START_PAYLOAD')
LOG.info('Start cycling task id %s', self.task_id)
if ('STOP' in payload['name']):
try:
self.manager._force_delete_recurrent_task(self.task_id)

View File

@ -22,7 +22,7 @@ class AlarmTestsV1(base.TestCase):
_service = 'security'
def test_list_alarms(self):
resp, body = self.client.get("v1/security_alarms")
resp, body = self.security_client.get("v1/security_alarms")
self.assertEqual(200, resp.status)
@ -31,7 +31,7 @@ class ReportTestsV1(base.TestCase):
_service = 'security'
def test_list_reports(self):
resp, body = self.client.get("v1/security_reports")
resp, body = self.security_client.get("v1/security_reports")
self.assertEqual(200, resp.status)
@ -40,7 +40,7 @@ class TaskTestsV1(base.TestCase):
_service = 'security'
def test_list_tasks(self):
resp, body = self.client.get("v1/tasks")
resp, body = self.security_client.get("v1/tasks")
self.assertEqual(200, resp.status)
@ -49,5 +49,5 @@ class PluginTestsV1(base.TestCase):
_service = 'security'
def test_list_plugins(self):
resp, body = self.client.get("v1/plugins")
resp, body = self.security_client.get("v1/plugins")
self.assertEqual(200, resp.status)

View File

@ -111,7 +111,7 @@ class TestCase(base.BaseTestCase):
else:
cls.mgr = clients.Manager()
cls.client = CerberusClientV1(
cls.security_client = CerberusClientV1(
cls.mgr.auth_provider, cls._service)
def setUp(self):

View File

@ -0,0 +1,69 @@
#
# Copyright (c) 2015 EUROGICIEL
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
from tempest import test
from tempest_lib.common.utils import data_utils
from cerberus.tests.functional import base
class NotificationTests(base.TestCase):
_service = 'security'
@test.attr(type='gate')
def test_notification_image(self):
# Create image
image_name = data_utils.rand_name('image')
glance_resp = self.mgr.image_client.create_image(image_name,
'bare',
'iso',
visibility='private')
# Remove image at the end
self.addCleanup(self.mgr.image_client.delete_image, glance_resp['id'])
self.assertEqual('queued', glance_resp['status'])
image_id = glance_resp['id']
resp, task_list_0 = self.security_client.get("v1/tasks")
task_list_0 = json.loads(task_list_0)
# Update Image
self.mgr.image_client.update_image(image_id,
'START')
# Verifying task has been created
resp, task_list_1 = self.security_client.get("v1/tasks")
task_list_1 = json.loads(task_list_1)
import pdb
pdb.set_trace()
self.assertEqual(len(task_list_0.get('tasks', 0)) + 1,
len(task_list_1.get('tasks', 0)))
# Update Image
self.mgr.image_client.update_image(image_id,
'STOP')
# Verifying task has been created
resp, task_list_2 = self.security_client.get("v1/tasks")
task_list_2 = json.loads(task_list_2)
self.assertEqual(len(task_list_1.get('tasks', 0)) - 1,
len(task_list_2.get('tasks', 0)))