Tests for extension, db and plugin for LBaaS V2

Adding needed driver interface changes for tests.
Adding LoggingNoop driver needed changes for tests.
Adding extension, plugin, and db unit tests.

Partially-implements: blueprint lbaas-api-and-objmodel-improvement

Change-Id: I1f7762bf3c9484b8bf46ce32ab78574aa081324b
Co-authored-by: Brandon Logan <brandon.logan@rackspace.com>
Co-authored-by: Phillip Toohill <phillip.toohill@rackspace.com>
Co-authored-by: Doug Wiegley <dougw@a10networks.com>
This commit is contained in:
Brandon Logan 2014-07-08 17:30:45 -05:00
parent 635a4aa6e0
commit 55bac1ffc1
5 changed files with 2421 additions and 60 deletions

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db.loadbalancer import loadbalancer_db as lb_db
from neutron.db.loadbalancer import models
from neutron.services.loadbalancer.drivers import driver_mixins
@ -46,42 +46,63 @@ class LoadBalancerBaseDriver(object):
def __init__(self, plugin):
self.plugin = plugin
def activate_cascade(self, context, obj):
self.plugin.activate_linked_entities(context, obj)
class BaseLoadBalancerManager(driver_mixins.BaseRefreshMixin,
driver_mixins.BaseStatsMixin,
driver_mixins.BaseStatusUpdateMixin,
driver_mixins.BaseDeleteHelperMixin,
driver_mixins.BaseManagerMixin):
model_class = models.LoadBalancer
def __init__(self, driver):
super(BaseLoadBalancerManager, self).__init__(driver)
# TODO(dougw), use lb_db.LoadBalancer when v2 lbaas
# TODO(dougw), get rid of __init__() in StatusHelperManager, and
# the if is not None clauses; after fixing this next line,
# it can become a mandatory variable for that subclass.
self.model_class = None
@property
def db_delete_method(self):
return self.driver.plugin.db.delete_loadbalancer
class BaseListenerManager(driver_mixins.BaseManagerMixin):
pass
class BaseListenerManager(driver_mixins.BaseStatusUpdateMixin,
driver_mixins.BaseDeleteHelperMixin,
driver_mixins.BaseManagerMixin):
model_class = models.Listener
@property
def db_delete_method(self):
return self.driver.plugin.db.delete_listener
def defer_cascade(self, context, listener):
self.driver.plugin.defer_listener(context, listener)
class BasePoolManager(driver_mixins.BaseStatusUpdateMixin,
driver_mixins.BaseDeleteHelperMixin,
driver_mixins.BaseManagerMixin):
model_class = models.PoolV2
def __init__(self, driver):
super(BasePoolManager, self).__init__(driver)
self.model_class = lb_db.Pool
@property
def db_delete_method(self):
return self.driver.plugin.db.delete_pool
def defer_cascade(self, context, pool):
self.driver.plugin.defer_pool(context, pool)
class BaseMemberManager(driver_mixins.BaseStatusUpdateMixin,
driver_mixins.BaseDeleteHelperMixin,
driver_mixins.BaseManagerMixin):
model_class = models.MemberV2
def __init__(self, driver):
super(BaseMemberManager, self).__init__(driver)
self.model_class = lb_db.Member
@property
def db_delete_method(self):
return self.driver.plugin.delete_member
class BaseHealthMonitorManager(
driver_mixins.BaseHealthMonitorStatusUpdateMixin,
driver_mixins.BaseManagerMixin):
pass
class BaseHealthMonitorManager(driver_mixins.BaseStatusUpdateMixin,
driver_mixins.BaseDeleteHelperMixin,
driver_mixins.BaseManagerMixin):
model_class = models.HealthMonitorV2
@property
def db_delete_method(self):
return self.driver.plugin.db.delete_healthmonitor

View File

@ -56,30 +56,26 @@ class BaseStatsMixin(object):
class BaseStatusUpdateMixin(object):
# Status update helpers
# Note: You must set self.model_class to an appropriate neutron model
# Note: You must set model_class to an appropriate neutron model
# in your base manager class.
def active(self, context, model_id):
if self.model_class is not None:
self.driver.plugin.update_status(context, self.model_class,
model_id, constants.ACTIVE)
self.driver.plugin.db.update_status(context, self.model_class,
model_id, constants.ACTIVE)
def failed(self, context, model_id):
if self.model_class is not None:
self.driver.plugin.update_status(context, self.model_class,
model_id, constants.ERROR)
self.driver.plugin.db.update_status(context, self.model_class,
model_id, constants.ERROR)
def defer(self, context, model_id):
self.driver.plugin.db.update_status(context, self.model_class,
model_id, constants.DEFERRED)
class BaseHealthMonitorStatusUpdateMixin(object):
class BaseDeleteHelperMixin(object):
def active(self, context, health_monitor_id, pool_id):
self.driver.plugin.update_pool_health_monitor(context,
health_monitor_id,
pool_id,
constants.ACTIVE)
# DB delete helper
# Must define appropriate db delete function
def failed(self, context, health_monitor_id, pool_id):
self.driver.plugin.update_pool_health_monitor(context,
health_monitor_id,
pool_id,
constants.ERROR)
def db_delete(self, context, model_id):
self.db_delete_method(context, model_id)

View File

@ -70,26 +70,25 @@ class LoggingNoopLoadBalancerManager(LoggingNoopCommonManager,
def create(self, context, loadbalancer):
super(LoggingNoopLoadBalancerManager, self).create(context,
loadbalancer)
self.driver.plugin.activate_linked_entities(context,
loadbalancer)
self.driver.activate_cascade(context, loadbalancer)
def update(self, context, old_loadbalancer, loadbalancer):
super(LoggingNoopLoadBalancerManager, self).update(context,
old_loadbalancer,
loadbalancer)
self.driver.plugin.activate_linked_entities(context, loadbalancer)
self.driver.activate_cascade(context, loadbalancer)
def delete(self, context, loadbalancer):
super(LoggingNoopLoadBalancerManager, self).delete(context,
loadbalancer)
self.driver.plugin._delete_db_loadbalancer(context, loadbalancer.id)
self.db_delete(context, loadbalancer.id)
class LoggingNoopListenerManager(LoggingNoopCommonManager,
driver_base.BaseListenerManager):
def create(self, context, obj):
super(LoggingNoopListenerManager, self).create(context, obj)
self.driver.plugin.activate_linked_entities(context, obj)
self.driver.activate_cascade(context, obj)
def update(self, context, old_listener, new_listener):
super(LoggingNoopListenerManager, self).update(context, old_listener,
@ -97,22 +96,22 @@ class LoggingNoopListenerManager(LoggingNoopCommonManager,
if new_listener.attached_to_loadbalancer():
# Always activate listener and its children if attached to
# loadbalancer
self.driver.plugin.activate_linked_entities(context, new_listener)
self.driver.activate_cascade(context, new_listener)
elif old_listener.attached_to_loadbalancer():
# If listener has just been detached from loadbalancer
# defer listener and its children
self.driver.plugin.defer_listener(context, new_listener)
self.defer_cascade(context, new_listener)
if not new_listener.default_pool and old_listener.default_pool:
# if listener's pool has been detached then defer the pool
# and its children
self.driver.plugin.defer_pool(context, old_listener.default_pool)
self.driver.pool.defer_cascade(context, old_listener.default_pool)
def delete(self, context, listener):
super(LoggingNoopListenerManager, self).delete(context, listener)
if listener.default_pool:
self.driver.plugin.defer_pool(context, listener.default_pool)
self.driver.plugin._delete_db_listener(context, listener.id)
self.driver.pool.defer_cascade(context, listener.default_pool)
self.db_delete(context, listener.id)
class LoggingNoopPoolManager(LoggingNoopCommonManager,
@ -121,36 +120,36 @@ class LoggingNoopPoolManager(LoggingNoopCommonManager,
super(LoggingNoopPoolManager, self).create(context, pool)
# This shouldn't be called since a pool cannot be created and linked
# to a loadbalancer at the same time
self.driver.plugin.activate_linked_entities(context, pool)
self.driver.activate_cascade(context, pool)
def update(self, context, old_pool, pool):
super(LoggingNoopPoolManager, self).update(context, old_pool, pool)
self.driver.plugin.activate_linked_entities(context, pool)
self.driver.activate_cascade(context, pool)
if not pool.healthmonitor and old_pool.healthmonitor:
self.driver.plugin.defer_healthmonitor(context,
old_pool.healthmonitor)
self.driver.health_monitor.defer(context,
old_pool.healthmonitor.id)
def delete(self, context, pool):
super(LoggingNoopPoolManager, self).delete(context, pool)
if pool.healthmonitor:
self.driver.plugin.defer_healthmonitor(context, pool.healthmonitor)
self.driver.plugin._delete_db_pool(context, pool.id)
self.driver.health_monitor.defer(context, pool.healthmonitor.id)
self.db_delete(context, pool.id)
class LoggingNoopMemberManager(LoggingNoopCommonManager,
driver_base.BaseMemberManager):
def create(self, context, member):
super(LoggingNoopMemberManager, self).create(context, member)
self.driver.plugin.activate_linked_entities(context, member)
self.driver.activate_cascade(context, member)
def update(self, context, old_member, member):
super(LoggingNoopMemberManager, self).update(context, old_member,
member)
self.driver.plugin.activate_linked_entities(context, member)
self.driver.activate_cascade(context, member)
def delete(self, context, member):
super(LoggingNoopMemberManager, self).delete(context, member)
self.driver.plugin._delete_db_pool_member(context, member.id)
self.db_delete(context, member.id)
class LoggingNoopHealthMonitorManager(LoggingNoopCommonManager,
@ -159,15 +158,15 @@ class LoggingNoopHealthMonitorManager(LoggingNoopCommonManager,
def create(self, context, healthmonitor):
super(LoggingNoopHealthMonitorManager, self).create(context,
healthmonitor)
self.driver.plugin.activate_linked_entities(context, healthmonitor)
self.driver.activate_cascade(context, healthmonitor)
def update(self, context, old_healthmonitor, healthmonitor):
super(LoggingNoopHealthMonitorManager, self).update(context,
old_healthmonitor,
healthmonitor)
self.driver.plugin.activate_linked_entities(context, healthmonitor)
self.driver.activate_cascade(context, healthmonitor)
def delete(self, context, healthmonitor):
super(LoggingNoopHealthMonitorManager, self).delete(context,
healthmonitor)
self.driver.plugin._delete_db_pool_member(context, healthmonitor.id)
self.db_delete(context, healthmonitor.id)

File diff suppressed because it is too large Load Diff

View File

@ -20,6 +20,7 @@ from webob import exc
from neutron.api.v2 import attributes as attr
from neutron.extensions import loadbalancer
from neutron.extensions import loadbalancerv2
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
from neutron.tests.unit import test_api_v2
@ -460,3 +461,523 @@ class LoadBalancerExtensionTestCase(test_api_v2_extension.ExtensionTestCase):
class LoadBalancerExtensionTestCaseXML(LoadBalancerExtensionTestCase):
fmt = 'xml'
class LoadBalancerExtensionV2TestCase(test_api_v2_extension.ExtensionTestCase):
fmt = 'json'
def setUp(self):
super(LoadBalancerExtensionV2TestCase, self).setUp()
self._setUpExtension(
'neutron.extensions.loadbalancerv2.LoadBalancerPluginBaseV2',
constants.LOADBALANCERV2, loadbalancerv2.RESOURCE_ATTRIBUTE_MAP,
loadbalancerv2.Loadbalancerv2, 'lbaas', use_quota=True)
def test_loadbalancer_create(self):
lb_id = _uuid()
data = {'loadbalancer': {'name': 'lb1',
'description': 'descr_lb1',
'tenant_id': _uuid(),
'vip_subnet_id': _uuid(),
'admin_state_up': True,
'vip_address': '127.0.0.1'}}
return_value = copy.copy(data['loadbalancer'])
return_value.update({'status': 'ACTIVE', 'id': lb_id})
instance = self.plugin.return_value
instance.create_loadbalancer.return_value = return_value
res = self.api.post(_get_path('lbaas/loadbalancers', fmt=self.fmt),
self.serialize(data),
content_type='application/{0}'.format(self.fmt))
instance.create_loadbalancer.assert_called_with(mock.ANY,
loadbalancer=data)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
res = self.deserialize(res)
self.assertIn('loadbalancer', res)
self.assertEqual(res['loadbalancer'], return_value)
def test_loadbalancer_list(self):
lb_id = _uuid()
return_value = [{'name': 'lb1',
'admin_state_up': True,
'tenant_id': _uuid(),
'id': lb_id}]
instance = self.plugin.return_value
instance.get_loadbalancers.return_value = return_value
res = self.api.get(_get_path('lbaas/loadbalancers', fmt=self.fmt))
instance.get_loadbalancers.assert_called_with(mock.ANY,
fields=mock.ANY,
filters=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
def test_loadbalancer_update(self):
lb_id = _uuid()
update_data = {'loadbalancer': {'admin_state_up': False}}
return_value = {'name': 'lb1',
'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE",
'id': lb_id}
instance = self.plugin.return_value
instance.update_loadbalancer.return_value = return_value
res = self.api.put(_get_path('lbaas/loadbalancers',
id=lb_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_loadbalancer.assert_called_with(
mock.ANY, lb_id, loadbalancer=update_data)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('loadbalancer', res)
self.assertEqual(res['loadbalancer'], return_value)
def test_loadbalancer_get(self):
lb_id = _uuid()
return_value = {'name': 'lb1',
'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE",
'id': lb_id}
instance = self.plugin.return_value
instance.get_loadbalancer.return_value = return_value
res = self.api.get(_get_path('lbaas/loadbalancers',
id=lb_id,
fmt=self.fmt))
instance.get_loadbalancer.assert_called_with(mock.ANY, lb_id,
fields=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('loadbalancer', res)
self.assertEqual(res['loadbalancer'], return_value)
def test_loadbalancer_delete(self):
self._test_entity_delete('loadbalancer')
def test_listener_create(self):
listener_id = _uuid()
data = {'listener': {'tenant_id': _uuid(),
'name': 'listen-name-1',
'description': 'listen-1-desc',
'protocol': 'HTTP',
'protocol_port': 80,
'connection_limit': 100,
'admin_state_up': True,
'loadbalancer_id': _uuid(),
'default_pool_id': _uuid()}}
return_value = copy.copy(data['listener'])
return_value.update({'status': 'ACTIVE', 'id': listener_id})
instance = self.plugin.return_value
instance.create_listener.return_value = return_value
res = self.api.post(_get_path('lbaas/listeners', fmt=self.fmt),
self.serialize(data),
content_type='application/{0}'.format(self.fmt))
instance.create_listener.assert_called_with(mock.ANY,
listener=data)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
res = self.deserialize(res)
self.assertIn('listener', res)
self.assertEqual(res['listener'], return_value)
def test_listener_list(self):
listener_id = _uuid()
return_value = [{'admin_state_up': True,
'tenant_id': _uuid(),
'id': listener_id}]
instance = self.plugin.return_value
instance.get_listeners.return_value = return_value
res = self.api.get(_get_path('lbaas/listeners', fmt=self.fmt))
instance.get_listeners.assert_called_with(mock.ANY,
fields=mock.ANY,
filters=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
def test_listener_update(self):
listener_id = _uuid()
update_data = {'listener': {'admin_state_up': False}}
return_value = {'name': 'listener1',
'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE",
'id': listener_id}
instance = self.plugin.return_value
instance.update_listener.return_value = return_value
res = self.api.put(_get_path('lbaas/listeners',
id=listener_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_listener.assert_called_with(
mock.ANY, listener_id, listener=update_data)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('listener', res)
self.assertEqual(res['listener'], return_value)
def test_listener_get(self):
listener_id = _uuid()
return_value = {'name': 'listener1',
'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE",
'id': listener_id}
instance = self.plugin.return_value
instance.get_listener.return_value = return_value
res = self.api.get(_get_path('lbaas/listeners',
id=listener_id,
fmt=self.fmt))
instance.get_listener.assert_called_with(mock.ANY, listener_id,
fields=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('listener', res)
self.assertEqual(res['listener'], return_value)
def test_listener_delete(self):
self._test_entity_delete('listener')
def test_pool_create(self):
pool_id = _uuid()
hm_id = _uuid()
data = {'pool': {'name': 'pool1',
'description': 'descr_pool1',
'protocol': 'HTTP',
'lb_algorithm': 'ROUND_ROBIN',
'healthmonitor_id': hm_id,
'admin_state_up': True,
'tenant_id': _uuid(),
'session_persistence': {}}}
return_value = copy.copy(data['pool'])
return_value.update({'status': "ACTIVE", 'id': pool_id})
instance = self.plugin.return_value
instance.create_pool.return_value = return_value
res = self.api.post(_get_path('lbaas/pools', fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
instance.create_pool.assert_called_with(mock.ANY,
pool=data)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
res = self.deserialize(res)
self.assertIn('pool', res)
self.assertEqual(res['pool'], return_value)
def test_pool_list(self):
pool_id = _uuid()
return_value = [{'name': 'pool1',
'admin_state_up': True,
'tenant_id': _uuid(),
'id': pool_id}]
instance = self.plugin.return_value
instance.get_pools.return_value = return_value
res = self.api.get(_get_path('lbaas/pools', fmt=self.fmt))
instance.get_pools.assert_called_with(mock.ANY, fields=mock.ANY,
filters=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
def test_pool_update(self):
pool_id = _uuid()
update_data = {'pool': {'admin_state_up': False}}
return_value = {'name': 'pool1',
'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE",
'id': pool_id}
instance = self.plugin.return_value
instance.update_pool.return_value = return_value
res = self.api.put(_get_path('lbaas/pools', id=pool_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_pool.assert_called_with(mock.ANY, pool_id,
pool=update_data)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('pool', res)
self.assertEqual(res['pool'], return_value)
def test_pool_get(self):
pool_id = _uuid()
return_value = {'name': 'pool1',
'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE",
'id': pool_id}
instance = self.plugin.return_value
instance.get_pool.return_value = return_value
res = self.api.get(_get_path('lbaas/pools', id=pool_id,
fmt=self.fmt))
instance.get_pool.assert_called_with(mock.ANY, pool_id,
fields=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('pool', res)
self.assertEqual(res['pool'], return_value)
def test_pool_delete(self):
self._test_entity_delete('pool')
def test_pool_member_create(self):
subnet_id = _uuid()
member_id = _uuid()
data = {'member': {'address': '10.0.0.1',
'protocol_port': 80,
'weight': 1,
'subnet_id': subnet_id,
'admin_state_up': True,
'tenant_id': _uuid()}}
return_value = copy.copy(data['member'])
return_value.update({'status': "ACTIVE", 'id': member_id})
instance = self.plugin.return_value
instance.create_pool_member.return_value = return_value
res = self.api.post(_get_path('lbaas/pools/pid1/members',
fmt=self.fmt),
self.serialize(data),
content_type='application/%s'
% self.fmt)
instance.create_pool_member.assert_called_with(mock.ANY,
pool_id='pid1',
member=data)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
res = self.deserialize(res)
self.assertIn('member', res)
self.assertEqual(res['member'], return_value)
def test_pool_member_list(self):
member_id = _uuid()
return_value = [{'name': 'member1',
'admin_state_up': True,
'tenant_id': _uuid(),
'id': member_id}]
instance = self.plugin.return_value
instance.get_pools.return_value = return_value
res = self.api.get(_get_path('lbaas/pools/pid1/members',
fmt=self.fmt))
instance.get_pool_members.assert_called_with(mock.ANY,
fields=mock.ANY,
filters=mock.ANY,
pool_id='pid1')
self.assertEqual(res.status_int, exc.HTTPOk.code)
def test_pool_member_update(self):
self.skipTest('mock autospec bug causes false negative.'
'Similar bug: '
'https://code.google.com/p/mock/issues/detail?id=224')
member_id = _uuid()
update_data = {'member': {'admin_state_up': False}}
return_value = {'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE",
'id': member_id}
instance = self.plugin.return_value
instance.update_pool_member.return_value = return_value
res = self.api.put(_get_path('lbaas/pools/pid1/members',
id=member_id,
fmt=self.fmt),
self.serialize(update_data),
content_type='application/%s'
% self.fmt)
instance.update_pool_member.assert_called_with(mock.ANY,
member_id,
member=update_data,
pool_id='pid1')
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('member', res)
self.assertEqual(res['member'], return_value)
def test_pool_member_get(self):
member_id = _uuid()
return_value = {'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE",
'id': member_id}
instance = self.plugin.return_value
instance.get_pool_member.return_value = return_value
res = self.api.get(_get_path('lbaas/pools/pid1/members',
id=member_id, fmt=self.fmt))
instance.get_pool_member.assert_called_with(mock.ANY,
member_id,
fields=mock.ANY,
pool_id='pid1')
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('member', res)
self.assertEqual(res['member'], return_value)
def test_pool_member_delete(self):
entity_id = _uuid()
res = self.api.delete(
test_api_v2._get_path('lbaas/pools/pid1/members',
id=entity_id, fmt=self.fmt))
delete_entity = getattr(self.plugin.return_value,
"delete_pool_member")
delete_entity.assert_called_with(mock.ANY, entity_id,
pool_id='pid1')
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
def test_health_monitor_create(self):
health_monitor_id = _uuid()
data = {'healthmonitor': {'type': 'HTTP',
'delay': 2,
'timeout': 1,
'max_retries': 3,
'http_method': 'GET',
'url_path': '/path',
'expected_codes': '200-300',
'admin_state_up': True,
'tenant_id': _uuid()}}
return_value = copy.copy(data['healthmonitor'])
return_value.update({'status': "ACTIVE", 'id': health_monitor_id})
instance = self.plugin.return_value
instance.create_healthmonitor.return_value = return_value
res = self.api.post(_get_path('lbaas/healthmonitors',
fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt)
instance.create_healthmonitor.assert_called_with(mock.ANY,
healthmonitor=data)
self.assertEqual(res.status_int, exc.HTTPCreated.code)
res = self.deserialize(res)
self.assertIn('healthmonitor', res)
self.assertEqual(res['healthmonitor'], return_value)
def test_health_monitor_create_with_timeout_negative(self):
data = {'healthmonitor': {'type': 'HTTP',
'delay': 2,
'timeout': -1,
'max_retries': 3,
'http_method': 'GET',
'url_path': '/path',
'expected_codes': '200-300',
'admin_state_up': True,
'tenant_id': _uuid()}}
res = self.api.post(_get_path('lbaas/healthmonitors',
fmt=self.fmt),
self.serialize(data),
content_type='application/%s' % self.fmt,
expect_errors=True)
self.assertEqual(400, res.status_int)
def test_health_monitor_list(self):
health_monitor_id = _uuid()
return_value = [{'type': 'HTTP',
'admin_state_up': True,
'tenant_id': _uuid(),
'id': health_monitor_id}]
instance = self.plugin.return_value
instance.get_healthmonitors.return_value = return_value
res = self.api.get(_get_path('lbaas/healthmonitors', fmt=self.fmt))
instance.get_healthmonitors.assert_called_with(
mock.ANY, fields=mock.ANY, filters=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
def test_health_monitor_update(self):
health_monitor_id = _uuid()
update_data = {'healthmonitor': {'admin_state_up': False}}
return_value = {'type': 'HTTP',
'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE",
'id': health_monitor_id}
instance = self.plugin.return_value
instance.update_healthmonitor.return_value = return_value
res = self.api.put(_get_path('lbaas/healthmonitors',
id=health_monitor_id,
fmt=self.fmt),
self.serialize(update_data))
instance.update_healthmonitor.assert_called_with(
mock.ANY, health_monitor_id, healthmonitor=update_data)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('healthmonitor', res)
self.assertEqual(res['healthmonitor'], return_value)
def test_health_monitor_get(self):
health_monitor_id = _uuid()
return_value = {'type': 'HTTP',
'admin_state_up': False,
'tenant_id': _uuid(),
'status': "ACTIVE",
'id': health_monitor_id}
instance = self.plugin.return_value
instance.get_healthmonitor.return_value = return_value
res = self.api.get(_get_path('lbaas/healthmonitors',
id=health_monitor_id,
fmt=self.fmt))
instance.get_healthmonitor.assert_called_with(
mock.ANY, health_monitor_id, fields=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('healthmonitor', res)
self.assertEqual(res['healthmonitor'], return_value)
def test_health_monitor_delete(self):
self._test_entity_delete('healthmonitor')
def test_load_balancer_stats(self):
load_balancer_id = _uuid()
stats = {'stats': 'dummy'}
instance = self.plugin.return_value
instance.stats.return_value = stats
path = _get_path('lbaas/loadbalancers', id=load_balancer_id,
action="stats", fmt=self.fmt)
res = self.api.get(path)
instance.stats.assert_called_with(mock.ANY, load_balancer_id)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertIn('stats', res)
self.assertEqual(res['stats'], stats['stats'])