Merge "Adding code to prevent vip port deletion from port api"

This commit is contained in:
Jenkins 2015-06-12 18:00:09 +00:00 committed by Gerrit Code Review
commit 3e241a78f6
5 changed files with 109 additions and 2 deletions

View File

@ -14,6 +14,10 @@
#
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as n_constants
from neutron.common import exceptions as n_exc
from neutron.db import common_db_mixin as base_db
from neutron.db import model_base
@ -337,7 +341,7 @@ class LoadBalancerPluginDb(loadbalancer.LoadBalancerPluginBase,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': False,
'device_id': '',
'device_owner': '',
'device_owner': n_constants.DEVICE_OWNER_LOADBALANCER,
'fixed_ips': [fixed_ip]
}
@ -478,6 +482,24 @@ class LoadBalancerPluginDb(loadbalancer.LoadBalancerPluginBase,
if vip.port: # this is a Neutron port
self._core_plugin.delete_port(context, vip.port.id)
def prevent_lbaas_port_deletion(self, context, port_id):
try:
port_db = self._core_plugin._get_port(context, port_id)
except n_exc.PortNotFound:
return
# Check only if the owner is loadbalancer.
if port_db['device_owner'] == n_constants.DEVICE_OWNER_LOADBALANCER:
filters = {'port_id': [port_id]}
if len(self.get_vips(context, filters=filters)) > 0:
reason = _('has device owner %s') % port_db['device_owner']
raise n_exc.ServicePortInUse(port_id=port_db['id'],
reason=reason)
def subscribe(self):
registry.subscribe(
_prevent_lbaas_port_delete_callback, resources.PORT,
events.BEFORE_DELETE)
def get_vip(self, context, id, fields=None):
vip = self._get_resource(context, Vip, id)
return self._make_vip_dict(vip, fields)
@ -815,3 +837,13 @@ class LoadBalancerPluginDb(loadbalancer.LoadBalancerPluginBase,
return self._get_collection(context, HealthMonitor,
self._make_health_monitor_dict,
filters=filters, fields=fields)
def _prevent_lbaas_port_delete_callback(resource, event, trigger, **kwargs):
context = kwargs['context']
port_id = kwargs['port_id']
port_check = kwargs['port_check']
lbaasplugin = manager.NeutronManager.get_service_plugins().get(
constants.LOADBALANCER)
if lbaasplugin and port_check:
lbaasplugin.prevent_lbaas_port_deletion(context, port_id)

View File

@ -14,6 +14,11 @@
# under the License.
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as n_constants
from neutron.common import exceptions as n_exc
from neutron.db import common_db_mixin as base_db
from neutron import manager
from neutron.plugins.common import constants
@ -94,7 +99,7 @@ class LoadBalancerPluginDbv2(base_db.CommonDbMixin,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'admin_state_up': False,
'device_id': '',
'device_owner': '',
'device_owner': n_constants.DEVICE_OWNER_LOADBALANCERV2,
'fixed_ips': [fixed_ip]
}
@ -233,6 +238,23 @@ class LoadBalancerPluginDbv2(base_db.CommonDbMixin,
if lb_db.vip_port:
self._core_plugin.delete_port(context, lb_db.vip_port_id)
def prevent_lbaasv2_port_deletion(self, context, port_id):
try:
port_db = self._core_plugin._get_port(context, port_id)
except n_exc.PortNotFound:
return
if port_db['device_owner'] == n_constants.DEVICE_OWNER_LOADBALANCERV2:
filters = {'vip_port_id': [port_id]}
if len(self.get_loadbalancers(context, filters=filters)) > 0:
reason = _('has device owner %s') % port_db['device_owner']
raise n_exc.ServicePortInUse(port_id=port_db['id'],
reason=reason)
def subscribe(self):
registry.subscribe(
_prevent_lbaasv2_port_delete_callback, resources.PORT,
events.BEFORE_DELETE)
def get_loadbalancers(self, context, filters=None):
lb_dbs = self._get_resources(context, models.LoadBalancer,
filters=filters)
@ -549,3 +571,13 @@ class LoadBalancerPluginDbv2(base_db.CommonDbMixin,
loadbalancer_id)
return data_models.LoadBalancerStatistics.from_sqlalchemy_model(
loadbalancer.stats)
def _prevent_lbaasv2_port_delete_callback(resource, event, trigger, **kwargs):
context = kwargs['context']
port_id = kwargs['port_id']
port_check = kwargs['port_check']
lbaasv2plugin = manager.NeutronManager.get_service_plugins().get(
constants.LOADBALANCERV2)
if lbaasv2plugin and port_check:
lbaasv2plugin.db.prevent_lbaasv2_port_deletion(context, port_id)

View File

@ -77,6 +77,7 @@ class LoadBalancerPlugin(ldb.LoadBalancerPluginDb,
"""Initialization for the loadbalancer service plugin."""
self.service_type_manager = st_db.ServiceTypeManager.get_instance()
self._load_drivers()
super(LoadBalancerPlugin, self).subscribe()
def _load_drivers(self):
"""Loads plugin-drivers specified in configuration."""
@ -382,6 +383,7 @@ class LoadBalancerPluginv2(loadbalancerv2.LoadBalancerPluginBaseV2):
self.db = ldbv2.LoadBalancerPluginDbv2()
self.service_type_manager = st_db.ServiceTypeManager.get_instance()
self._load_drivers()
self.db.subscribe()
def _load_drivers(self):
"""Loads plugin-drivers specified in configuration."""

View File

@ -18,6 +18,7 @@ import contextlib
import mock
from neutron.api import extensions
from neutron.common import config
from neutron.common import constants as n_constants
from neutron.common import exceptions as n_exc
from neutron import context
from neutron.db import servicetype_db as sdb
@ -1618,3 +1619,21 @@ class TestLoadBalancer(LoadBalancerPluginDbTestCase):
SystemExit,
loadbalancer_plugin.LoadBalancerPlugin
)
def test_port_delete_via_port_api(self):
port = {
'id': 'my_port_id',
'device_owner': n_constants.DEVICE_OWNER_LOADBALANCER
}
ctx = context.get_admin_context()
port['device_owner'] = n_constants.DEVICE_OWNER_LOADBALANCER
myvips = [{'name': 'vip1'}]
with mock.patch.object(manager.NeutronManager, 'get_plugin') as gp:
self.plugin.get_vips = mock.Mock(return_value=myvips)
plugin = mock.Mock()
gp.return_value = plugin
plugin._get_port.return_value = port
self.assertRaises(n_exc.ServicePortInUse,
self.plugin.prevent_lbaas_port_deletion,
ctx,
port['id'])

View File

@ -21,6 +21,8 @@ import six
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.common import config
from neutron.common import constants as n_constants
from neutron.common import exceptions as n_exc
from neutron import context
import neutron.db.l3_db # noqa
from neutron.db import servicetype_db as sdb
@ -31,6 +33,7 @@ from oslo_config import cfg
import testtools
import webob.exc
from neutron import manager
from neutron_lbaas.common.cert_manager import cert_manager
from neutron_lbaas.common import exceptions
from neutron_lbaas.db.loadbalancer import models
@ -657,6 +660,25 @@ class LbaasLoadBalancerTests(LbaasPluginDbTestCase):
self.assertEqual(body['loadbalancer'][k],
expected_values[k])
def test_port_delete_via_port_api(self):
port = {
'id': 'my_port_id',
'device_owner': n_constants.DEVICE_OWNER_LOADBALANCERV2
}
ctx = context.get_admin_context()
port['device_owner'] = n_constants.DEVICE_OWNER_LOADBALANCERV2
myloadbalancers = [{'name': 'lb1'}]
with mock.patch.object(manager.NeutronManager, 'get_plugin') as gp:
self.plugin.db.get_loadbalancers = mock.Mock(
return_value=myloadbalancers)
plugin = mock.Mock()
gp.return_value = plugin
plugin._get_port.return_value = port
self.assertRaises(n_exc.ServicePortInUse,
self.plugin.db.prevent_lbaasv2_port_deletion,
ctx,
port['id'])
class ListenerTestBase(LbaasPluginDbTestCase):
def setUp(self):