Add Quantum handlers for floating ip with tests

Change-Id: Iae90d73d3627174ce2db595d75cd4db579d693ac
This commit is contained in:
Endre Karlson 2012-11-18 21:51:45 +01:00
parent a7844a03de
commit 070f1896fa
3 changed files with 142 additions and 0 deletions

View File

@ -0,0 +1,67 @@
# Copyright 2012 Bouvet ASA
#
# Author: Endre Karlson <endre.karlson@bouvet.no>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from moniker.openstack.common import cfg
from moniker.openstack.common import log as logging
from moniker.notification_handler.base import BaseAddressHandler
LOG = logging.getLogger(__name__)
class QuantumFloatingHandler(BaseAddressHandler):
__plugin_name__ = 'quantum_floatingip'
""" Handler for Quantum's notifications """
@classmethod
def get_opts(cls):
opts = super(QuantumFloatingHandler, cls).get_opts()
opts.extend([
cfg.ListOpt('notification-topics', default=['monitor']),
cfg.StrOpt('control-exchange', default='quantum')])
return opts
def get_exchange_topics(self):
exchange = self.config.control_exchange
topics = [topic + ".info"
for topic in self.config.notification_topics]
return (exchange, topics)
def get_event_types(self):
return [
'floatingip.update.end',
]
def process_notification(self, event_type, payload):
LOG.debug('%s recieved notification - %s',
self.get_canonical_name(), event_type)
# FIXME: Quantum doesn't send ipv in the payload, should maybe
# determine this?
if event_type not in self.get_event_types():
raise ValueError('NovaFixedHandler recieved an invalid event type')
floating = payload['floatingip']
if floating['fixed_ip_address']:
address = {
'version': 4,
'address': floating['floating_ip_address']}
self._create([address], payload, resource_id=floating['id'],
resource_type='floatingip')
elif not floating['fixed_ip_address']:
self._delete(resource_id=payload['floatingip']['id'],
resource_type='floatingip')

View File

@ -0,0 +1,73 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nose import SkipTest
from moniker.openstack.common import log as logging
from moniker.tests.test_notification_handler import AddressHandlerTestCase
from moniker.notification_handler import quantum
LOG = logging.getLogger(__name__)
class QuantumFloatingTestCase(AddressHandlerTestCase):
__test__ = True
handler_cls = quantum.QuantumFloatingHandler
def test_floatingip_associate(self):
event_type = 'floatingip.update.end'
fixture = self.get_notification_fixture(
'quantum', event_type + '_associate')
self.assertIn(event_type, self.handler.get_event_types())
# Ensure we start with 0 records
records = self.central_service.get_records(self.admin_context,
self.domain_id)
self.assertEqual(0, len(records))
self.handler.process_notification(event_type, fixture['payload'])
# Ensure we now have exactly 1 record
records = self.central_service.get_records(self.admin_context,
self.domain_id)
self.assertEqual(len(records), 1)
def test_floatingip_disassociate(self):
start_event_type = 'floatingip.update.end'
start_fixture = self.get_notification_fixture(
'quantum', start_event_type + '_associate')
self.handler.process_notification(start_event_type,
start_fixture['payload'])
event_type = 'floatingip.update.end'
fixture = self.get_notification_fixture(
'quantum', event_type + '_disassociate')
self.assertIn(event_type, self.handler.get_event_types())
# Ensure we start with at least 1 record
records = self.central_service.get_records(self.admin_context,
self.domain_id)
self.assertGreaterEqual(len(records), 1)
self.handler.process_notification(event_type, fixture['payload'])
records = self.central_service.get_records(self.admin_context,
self.domain_id)
self.assertEqual(0, len(records))

View File

@ -62,6 +62,8 @@ setup(
[moniker.notification.handler]
nova_fixed = moniker.notification_handler.nova:NovaFixedHandler
quantum_floatingip = moniker.notification_handler.quantum\
:QuantumFloatingHandler
[moniker.backend]
bind9 = moniker.backend.impl_bind9:Bind9Backend