Add L3 Extension for Distributed Routers

This patch introduces the model and extension
framework for implementing distributed virtual
routing on top of Open vSwitch.

A new admin-only 'distributed' (as opposed to a
'centralized' or legacy-mode) attribute is added
to the API router resource. It is possible to convert
an existing (centralized) router to a distributed
one; the opposite conversion, even though allowed by
the API, may not be honored by the underlying
plugin implementation and an appropriate error will
be reported.

When creating a router (regardless of the user role),
Neutron will rely on a system wide configuration, whose
default currently allows to create 'centralized' routers.

Tests are added for basic unit coverage; when the first
building blocks for neutron-testing-refactor
are complete, functional testing will be added.
This is because we should be moving away from how
extension tests have been done up until now.

Partially-implements: blueprint neutron-ovs-dvr

DocImpact

Change-Id: I7227fbe2718eba6665a5afb5dcaaaa77b341091f
Authored-by:    Swaminathan Vasudevan <swaminathan.vasudevan@hp.com>
Co-Authored-By: Armando Migliaccio <armamig@gmail.com>
This commit is contained in:
Swaminathan Vasudevan 2014-03-31 12:19:03 -07:00 committed by armando-migliaccio
parent fb6e0653ac
commit 1caa51ea68
16 changed files with 970 additions and 40 deletions

View File

@ -2,6 +2,16 @@
# Print more verbose output (set logging level to INFO instead of default WARNING level).
# verbose = False
# =========Start Global Config Option for Distributed L3 Router===============
# Setting the "router_distributed" flag to "True" will default to the creation
# of distributed tenant routers. The admin can override this flag by specifying
# the type of the router on the create request (admin-only attribute). Default
# value is "False" to support legacy mode (centralized) routers.
#
# router_distributed = False
#
# ===========End Global Config Option for Distributed L3 Router===============
# Print debugging output (set logging level to DEBUG instead of default WARNING level).
# debug = False

View File

@ -63,7 +63,9 @@
"delete_port": "rule:admin_or_owner",
"create_router:external_gateway_info:enable_snat": "rule:admin_only",
"create_router:distributed": "rule:admin_only",
"update_router:external_gateway_info:enable_snat": "rule:admin_only",
"update_router:distributed": "rule:admin_only",
"create_firewall": "",
"get_firewall": "rule:admin_or_owner",
@ -104,6 +106,7 @@
"create_router": "rule:regular_user",
"get_router": "rule:admin_or_owner",
"get_router:distributed": "rule:admin_only",
"update_router:add_router_interface": "rule:admin_or_owner",
"update_router:remove_router_interface": "rule:admin_or_owner",
"delete_router": "rule:admin_or_owner",

View File

@ -479,6 +479,11 @@ def convert_to_boolean(data):
raise n_exc.InvalidInput(error_message=msg)
def convert_to_boolean_if_not_none(data):
if data is not None:
return convert_to_boolean(data)
def convert_to_int(data):
try:
return int(data)

View File

@ -33,12 +33,17 @@ DEVICE_OWNER_ROUTER_INTF = "network:router_interface"
DEVICE_OWNER_ROUTER_GW = "network:router_gateway"
DEVICE_OWNER_FLOATINGIP = "network:floatingip"
DEVICE_OWNER_DHCP = "network:dhcp"
DEVICE_OWNER_DVR_INTERFACE = "network:router_interface_distributed"
DEVICE_OWNER_AGENT_GW = "network:floatingip_agent_gateway"
DEVICE_OWNER_ROUTER_SNAT = "network:router_centralized_snat"
DEVICE_ID_RESERVED_DHCP_PORT = "reserved_dhcp_port"
FLOATINGIP_KEY = '_floatingips'
INTERFACE_KEY = '_interfaces'
METERING_LABEL_KEY = '_metering_labels'
FLOATINGIP_AGENT_INTF_KEY = '_floatingip_agent_interfaces'
SNAT_ROUTER_INTF_KEY = '_snat_router_interfaces'
IPv4 = 'IPv4'
IPv6 = 'IPv6'
@ -93,6 +98,7 @@ PORT_BINDING_EXT_ALIAS = 'binding'
L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler'
DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler'
LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler'
L3_DISTRIBUTED_EXT_ALIAS = 'dvr'
# Protocol names and numbers for Security Groups/Firewalls
PROTO_NAME_TCP = 'tcp'

78
neutron/db/l3_attrs_db.py Normal file
View File

@ -0,0 +1,78 @@
# Copyright (c) 2014 OpenStack Foundation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from sqlalchemy import orm
from neutron.db import db_base_plugin_v2
from neutron.db import l3_db
from neutron.db import model_base
from neutron.extensions import l3
class RouterExtraAttributes(model_base.BASEV2):
"""Additional attributes for a Virtual Router."""
# NOTE(armando-migliaccio): this model can be a good place to
# add extension attributes to a Router model. Each case needs
# to be individually examined, however 'distributed' and other
# simple ones fit the pattern well.
__tablename__ = "router_extra_attributes"
router_id = sa.Column(sa.String(36),
sa.ForeignKey('routers.id', ondelete="CASCADE"),
primary_key=True)
# Whether the router is a legacy (centralized) or a distributed one
distributed = sa.Column(sa.Boolean, default=False,
server_default=sa.sql.false(),
nullable=False)
router = orm.relationship(
l3_db.Router,
backref=orm.backref("extra_attributes", lazy='joined',
uselist=False, cascade='delete'))
class ExtraAttributesMixin(object):
"""Mixin class to enable router's extra attributes."""
extra_attributes = []
def _extend_extra_router_dict(self, router_res, router_db):
extra_attrs = router_db['extra_attributes']
for attr in self.extra_attributes:
name = attr['name']
default = attr['default']
router_res[name] = (
extra_attrs and extra_attrs[name] or default)
def _get_extra_attributes(self, router, extra_attributes):
return (dict((attr['name'],
router.get(attr['name'], attr['default']))
for attr in extra_attributes))
def _process_extra_attr_router_create(
self, context, router_db, router_req):
kwargs = self._get_extra_attributes(router_req, self.extra_attributes)
# extra_attributes reference is populated via backref
if not router_db['extra_attributes']:
attributes_db = RouterExtraAttributes(
router_id=router_db['id'], **kwargs)
context.session.add(attributes_db)
router_db['extra_attributes'] = attributes_db
else:
# The record will exist if RouterExtraAttributes model's
# attributes are added with db migrations over time
router_db['extra_attributes'].update(kwargs)
db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
l3.ROUTERS, ['_extend_extra_router_dict'])

View File

@ -125,8 +125,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
self._apply_dict_extend_functions(l3.ROUTERS, res, router)
return self._fields(res, fields)
def _create_router_db(self, context, router, tenant_id, gw_info):
"""Create the DB object and update gw info, if available."""
def _create_router_db(self, context, router, tenant_id):
"""Create the DB object."""
with context.session.begin(subtransactions=True):
# pre-generate id so it will be available when
# configuring external gw port
@ -143,7 +143,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
gw_info = r.pop(EXTERNAL_GW_INFO, None)
tenant_id = self._get_tenant_id_for_create(context, r)
with context.session.begin(subtransactions=True):
router_db = self._create_router_db(context, r, tenant_id, gw_info)
router_db = self._create_router_db(context, r, tenant_id)
if gw_info:
self._update_router_gw_info(context, router_db['id'],
gw_info, router=router_db)
@ -465,6 +465,23 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
'device_owner': owner,
'name': ''}})
def notify_router_interface_action(
self, context, router_id, tenant_id, port_id, subnet_id, action):
l3_method = '%s_router_interface' % action
self.l3_rpc_notifier.routers_updated(context, [router_id], l3_method)
mapping = {'add': 'create', 'remove': 'delete'}
info = {
'id': router_id,
'tenant_id': tenant_id,
'port_id': port_id,
'subnet_id': subnet_id
}
notifier = n_rpc.get_notifier('network')
router_event = 'router.interface.%s' % mapping[action]
notifier.info(context, router_event, {'router_interface': info})
return info
def add_router_interface(self, context, router_id, interface_info):
add_by_port, add_by_sub = self._validate_interface_info(interface_info)
device_owner = self._get_device_owner(context, router_id)
@ -476,16 +493,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
port = self._add_interface_by_subnet(
context, router_id, interface_info['subnet_id'], device_owner)
self.l3_rpc_notifier.routers_updated(
context, [router_id], 'add_router_interface')
info = {'id': router_id,
'tenant_id': port['tenant_id'],
'port_id': port['id'],
'subnet_id': port['fixed_ips'][0]['subnet_id']}
notifier = n_rpc.get_notifier('network')
notifier.info(
context, 'router.interface.create', {'router_interface': info})
return info
return self.notify_router_interface_action(
context, router_id, port['tenant_id'], port['id'],
port['fixed_ips'][0]['subnet_id'], 'add')
def _confirm_router_interface_not_in_use(self, context, router_id,
subnet_id):
@ -553,16 +563,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
port, subnet = self._remove_interface_by_subnet(
context, router_id, subnet_id, device_owner)
self.l3_rpc_notifier.routers_updated(
context, [router_id], 'remove_router_interface')
info = {'id': router_id,
'tenant_id': port['tenant_id'],
'port_id': port['id'],
'subnet_id': subnet['id']}
notifier = n_rpc.get_notifier('network')
notifier.info(
context, 'router.interface.delete', {'router_interface': info})
return info
return self.notify_router_interface_action(
context, router_id, port['tenant_id'], port['id'],
subnet['id'], 'remove')
def _get_floatingip(self, context, id):
try:
@ -695,8 +698,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
return (fip['port_id'], internal_ip_address, router_id)
def _update_fip_assoc(self, context, fip, floatingip_db, external_port):
previous_router_id = floatingip_db.router_id
def _check_and_get_fip_assoc(self, context, fip, floatingip_db):
port_id = internal_ip_address = router_id = None
if (('fixed_ip_address' in fip and fip['fixed_ip_address']) and
not ('port_id' in fip and fip['port_id'])):
@ -721,6 +723,12 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
net_id=floatingip_db['floating_network_id'])
except exc.NoResultFound:
pass
return port_id, internal_ip_address, router_id
def _update_fip_assoc(self, context, fip, floatingip_db, external_port):
previous_router_id = floatingip_db.router_id
port_id, internal_ip_address, router_id = (
self._check_and_get_fip_assoc(context, fip, floatingip_db))
floatingip_db.update({'fixed_ip_address': internal_ip_address,
'fixed_port_id': port_id,
'router_id': router_id,
@ -1017,10 +1025,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
else:
port['extra_subnets'].append(subnet_info)
def _process_sync_data(self, routers, interfaces, floating_ips):
routers_dict = {}
for router in routers:
routers_dict[router['id']] = router
def _process_floating_ips(self, context, routers_dict, floating_ips):
for floating_ip in floating_ips:
router = routers_dict.get(floating_ip['router_id'])
if router:
@ -1028,13 +1033,14 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
[])
router_floatingips.append(floating_ip)
router[l3_constants.FLOATINGIP_KEY] = router_floatingips
def _process_interfaces(self, routers_dict, interfaces):
for interface in interfaces:
router = routers_dict.get(interface['device_id'])
if router:
router_interfaces = router.get(l3_constants.INTERFACE_KEY, [])
router_interfaces.append(interface)
router[l3_constants.INTERFACE_KEY] = router_interfaces
return routers_dict.values()
def _get_router_info_list(self, context, router_ids=None, active=None,
device_owners=None):
@ -1052,4 +1058,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
def get_sync_data(self, context, router_ids=None, active=None):
routers, interfaces, floating_ips = self._get_router_info_list(
context, router_ids=router_ids, active=active)
return self._process_sync_data(routers, interfaces, floating_ips)
routers_dict = dict((router['id'], router) for router in routers)
self._process_floating_ips(context, routers_dict, floating_ips)
self._process_interfaces(routers_dict, interfaces)
return routers_dict.values()

496
neutron/db/l3_dvr_db.py Normal file
View File

@ -0,0 +1,496 @@
# Copyright (c) 2014 OpenStack Foundation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from neutron.api.v2 import attributes
from neutron.common import constants as l3_const
from neutron.common import exceptions as n_exc
from neutron.db import l3_attrs_db
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.extensions import portbindings
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
DEVICE_OWNER_DVR_INTERFACE = l3_const.DEVICE_OWNER_DVR_INTERFACE
DEVICE_OWNER_DVR_SNAT = l3_const.DEVICE_OWNER_ROUTER_SNAT
FLOATINGIP_AGENT_INTF_KEY = l3_const.FLOATINGIP_AGENT_INTF_KEY
DEVICE_OWNER_AGENT_GW = l3_const.DEVICE_OWNER_AGENT_GW
SNAT_ROUTER_INTF_KEY = l3_const.SNAT_ROUTER_INTF_KEY
router_distributed_opts = [
cfg.BoolOpt('router_distributed',
default=False,
help=_("System-wide flag to determine the type of router "
"that tenants can create. Only admin can override.")),
]
cfg.CONF.register_opts(router_distributed_opts)
class L3_NAT_with_dvr_db_mixin(l3_db.L3_NAT_db_mixin,
l3_attrs_db.ExtraAttributesMixin):
"""Mixin class to enable DVR support."""
router_device_owners = (
l3_db.L3_NAT_db_mixin.router_device_owners +
(DEVICE_OWNER_DVR_INTERFACE,))
extra_attributes = (
l3_attrs_db.ExtraAttributesMixin.extra_attributes + [{
'name': "distributed",
'default': cfg.CONF.router_distributed
}])
def _create_router_db(self, context, router, tenant_id):
"""Create a router db object with dvr additions."""
router['distributed'] = _is_distributed_router(router)
with context.session.begin(subtransactions=True):
router_db = super(
L3_NAT_with_dvr_db_mixin, self)._create_router_db(
context, router, tenant_id)
self._process_extra_attr_router_create(context, router_db, router)
return router_db
def _validate_router_migration(self, router_db, router_res):
"""Allow centralized -> distributed state transition only."""
if (router_db.extra_attributes.distributed and
router_res.get('distributed') is False):
LOG.info(_("Centralizing distributed router %s "
"is not supported"), router_db['id'])
raise NotImplementedError()
def _update_distributed_attr(
self, context, router_id, router_db, data, gw_info):
"""Update the model to support the dvr case of a router."""
if not attributes.is_attr_set(gw_info) and data.get('distributed'):
admin_ctx = context.elevated()
filters = {'device_id': [router_id],
'device_owner': [l3_const.DEVICE_OWNER_ROUTER_INTF]}
ports = self._core_plugin.get_ports(admin_ctx, filters=filters)
for p in ports:
port_db = self._core_plugin._get_port(admin_ctx, p['id'])
port_db.update({'device_owner': DEVICE_OWNER_DVR_INTERFACE})
def _update_router_db(self, context, router_id, data, gw_info):
with context.session.begin(subtransactions=True):
router_db = super(
L3_NAT_with_dvr_db_mixin, self)._update_router_db(
context, router_id, data, gw_info)
self._validate_router_migration(router_db, data)
# FIXME(swami): need to add migration status so that the scheduler
# can pick the migration request and move stuff over. For now
# only the distributed flag and router interface's owner change.
# Instead of complaining on _validate_router_migration, let's
# succeed here and complete the task in a follow-up patch
router_db.extra_attributes.update(data)
self._update_distributed_attr(
context, router_id, router_db, data, gw_info)
return router_db
def _delete_current_gw_port(self, context, router_id, router, new_network):
super(L3_NAT_with_dvr_db_mixin,
self)._delete_current_gw_port(context, router_id,
router, new_network)
if router.extra_attributes.distributed:
self.delete_csnat_router_interface_ports(
context.elevated(), router)
def _create_gw_port(self, context, router_id, router, new_network):
super(L3_NAT_with_dvr_db_mixin,
self)._create_gw_port(context, router_id,
router, new_network)
if router.extra_attributes.distributed:
snat_p_list = self.create_snat_intf_ports_if_not_exists(
context.elevated(), router['id'])
if not snat_p_list:
LOG.debug("SNAT interface ports not created: %s", snat_p_list)
def _get_device_owner(self, context, router=None):
"""Get device_owner for the specified router."""
router_is_uuid = isinstance(router, basestring)
if router_is_uuid:
router = self._get_router(context, router)
if _is_distributed_router(router):
return DEVICE_OWNER_DVR_INTERFACE
return super(L3_NAT_with_dvr_db_mixin,
self)._get_device_owner(context, router)
def _get_interface_ports_for_network(self, context, network_id):
router_intf_qry = (context.session.query(models_v2.Port).
filter_by(network_id=network_id))
return (router_intf_qry.
filter(models_v2.Port.device_owner.in_(
[l3_const.DEVICE_OWNER_ROUTER_INTF,
DEVICE_OWNER_DVR_INTERFACE])))
def _update_fip_assoc(self, context, fip, floatingip_db, external_port):
previous_router_id = floatingip_db.router_id
port_id, internal_ip_address, router_id = (
self._check_and_get_fip_assoc(context, fip, floatingip_db))
agt_gw_port_check = False
admin_ctx = context.elevated()
if (not ('port_id' in fip and fip['port_id'])) and (
floatingip_db['fixed_port_id'] is not None):
port_db = self._core_plugin._get_port(
context, floatingip_db['fixed_port_id'])
LOG.debug("VM Port info: %s", port_db)
fip_hostid = self.get_vm_port_hostid(context, port_db['id'])
if fip_hostid:
agt_gw_port_check = self.check_fips_availability_on_host(
admin_ctx, fip['id'], fip_hostid)
floatingip_db.update({'fixed_ip_address': internal_ip_address,
'fixed_port_id': port_id,
'router_id': router_id,
'last_known_router_id': previous_router_id})
if agt_gw_port_check:
LOG.debug('Deleting the Agent GW Port')
self.delete_floatingip_agent_gateway_port(admin_ctx, fip_hostid)
def add_router_interface(self, context, router_id, interface_info):
add_by_port, add_by_sub = self._validate_interface_info(interface_info)
router = self._get_router(context, router_id)
device_owner = self._get_device_owner(context, router)
if add_by_port:
port = self._add_interface_by_port(
context, router_id, interface_info['port_id'], device_owner)
elif add_by_sub:
port = self._add_interface_by_subnet(
context, router_id, interface_info['subnet_id'], device_owner)
if router.extra_attributes.distributed and router.gw_port:
self.add_csnat_router_interface_port(
context.elevated(), router_id, port['network_id'],
port['fixed_ips'][0]['subnet_id'])
return self.notify_router_interface_action(
context, router_id, port['tenant_id'], port['id'],
port['fixed_ips'][0]['subnet_id'], 'add')
def remove_router_interface(self, context, router_id, interface_info):
if not interface_info:
msg = _("Either subnet_id or port_id must be specified")
raise n_exc.BadRequest(resource='router', msg=msg)
port_id = interface_info.get('port_id')
subnet_id = interface_info.get('subnet_id')
router = self._get_router(context, router_id)
device_owner = self._get_device_owner(context, router)
if port_id:
port, subnet = self._remove_interface_by_port(
context, router_id, port_id, subnet_id, device_owner)
elif subnet_id:
port, subnet = self._remove_interface_by_subnet(
context, router_id, subnet_id, device_owner)
if router.extra_attributes.distributed and router.gw_port:
self.delete_csnat_router_interface_ports(
context.elevated(), router, subnet_id=subnet_id)
return self.notify_router_interface_action(
context, router_id, port['tenant_id'], port['id'],
subnet['id'], 'remove')
def get_snat_sync_interfaces(self, context, router_ids):
"""Query router interfaces that relate to list of router_ids."""
if not router_ids:
return []
filters = {'device_id': router_ids,
'device_owner': [DEVICE_OWNER_DVR_SNAT]}
interfaces = self._core_plugin.get_ports(context, filters)
LOG.debug("Return the SNAT ports: %s", interfaces)
if interfaces:
self._populate_subnet_for_ports(context, interfaces)
return interfaces
def _process_routers(self, context, routers):
routers_dict = {}
for router in routers:
routers_dict[router['id']] = router
router_ids = [router['id']]
if router['gw_port_id']:
snat_router_intfs = self.get_snat_sync_interfaces(context,
router_ids)
LOG.debug("SNAT ports returned: %s ", snat_router_intfs)
router[SNAT_ROUTER_INTF_KEY] = snat_router_intfs
return routers_dict
def _process_floating_ips(self, context, routers_dict, floating_ips):
for floating_ip in floating_ips:
router = routers_dict.get(floating_ip['router_id'])
if router:
router_floatingips = router.get(l3_const.FLOATINGIP_KEY, [])
floatingip_agent_intfs = []
if router['distributed']:
floating_ip['host'] = self.get_vm_port_hostid(
context, floating_ip['port_id'])
LOG.debug("Floating IP host: %s", floating_ip['host'])
fip_agent = self._get_agent_by_type_and_host(
context, l3_const.AGENT_TYPE_L3,
floating_ip['host'])
LOG.debug("FIP Agent : %s ", fip_agent['id'])
floatingip_agent_intfs = self.get_fip_sync_interfaces(
context, fip_agent['id'])
LOG.debug("FIP Agent ports: %s", floatingip_agent_intfs)
router_floatingips.append(floating_ip)
router[l3_const.FLOATINGIP_KEY] = router_floatingips
router[l3_const.FLOATINGIP_AGENT_INTF_KEY] = (
floatingip_agent_intfs)
def get_fip_sync_interfaces(self, context, fip_agent_id):
"""Query router interfaces that relate to list of router_ids."""
if not fip_agent_id:
return []
filters = {'device_id': [fip_agent_id],
'device_owner': [DEVICE_OWNER_AGENT_GW]}
interfaces = self._core_plugin.get_ports(context.elevated(), filters)
LOG.debug("Return the FIP ports: %s ", interfaces)
if interfaces:
self._populate_subnet_for_ports(context, interfaces)
return interfaces
def get_sync_data(self, context, router_ids=None, active=None):
routers, interfaces, floating_ips = self._get_router_info_list(
context, router_ids=router_ids, active=active,
device_owners=[l3_const.DEVICE_OWNER_ROUTER_INTF,
DEVICE_OWNER_DVR_INTERFACE])
# Add the port binding host to the floatingip dictionary
for fip in floating_ips:
fip['host'] = self.get_vm_port_hostid(context, fip['port_id'])
routers_dict = self._process_routers(context, routers)
self._process_floating_ips(context, routers_dict, floating_ips)
self._process_interfaces(routers_dict, interfaces)
return routers_dict.values()
def get_vm_port_hostid(self, context, port_id, port=None):
"""Return the portbinding host_id."""
vm_port_db = port or self._core_plugin.get_port(context, port_id)
allowed_device_owners = ("neutron:LOADBALANCER", DEVICE_OWNER_AGENT_GW)
device_owner = vm_port_db['device_owner'] if vm_port_db else ""
if (device_owner in allowed_device_owners or
device_owner.startswith("compute:")):
return vm_port_db[portbindings.HOST_ID]
def get_agent_gw_ports_exist_for_network(
self, context, network_id, host, agent_id):
"""Return agent gw port if exist, or None otherwise."""
if not network_id:
LOG.debug("Network not specified")
return
filters = {
'network_id': network_id,
'device_id': agent_id,
'device_owner': DEVICE_OWNER_AGENT_GW
}
ports = self._core_plugin.get_ports(context, filters)
if ports:
return ports[0]
def check_fips_availability_on_host(self, context, fip_id, host_id):
"""Query all floating_ips and filter by particular host."""
fip_count_on_host = 0
with context.session.begin(subtransactions=True):
routers = self._get_sync_routers(context, router_ids=None)
router_ids = [router['id'] for router in routers]
floating_ips = self._get_sync_floating_ips(context, router_ids)
# Check for the active floatingip in the host
for fip in floating_ips:
f_host = self.get_vm_port_hostid(context, fip['port_id'])
if f_host == host_id:
fip_count_on_host += 1
# If fip_count greater than 1 or equal to zero no action taken
# if the fip_count is equal to 1, then this would be last active
# fip in the host, so the agent gateway port can be deleted.
if fip_count_on_host == 1:
return True
return False
def delete_floatingip_agent_gateway_port(self, context, host_id):
"""Function to delete the FIP agent gateway port on host."""
# delete any fip agent gw port
device_filter = {'device_owner': [DEVICE_OWNER_AGENT_GW]}
ports = self._core_plugin.get_ports(context,
filters=device_filter)
for p in ports:
if self.get_vm_port_hostid(context, p['id'], p) == host_id:
self._core_plugin._delete_port(context, p['id'])
return
def create_fip_agent_gw_port_if_not_exists(
self, context, network_id, host):
"""Function to return the FIP Agent GW port.
This function will create a FIP Agent GW port
if required. If the port already exists, it
will return the existing port and will not
create a new one.
"""
l3_agent_db = self._get_agent_by_type_and_host(
context, l3_const.AGENT_TYPE_L3, host)
if l3_agent_db:
LOG.debug("Agent ID exists: %s", l3_agent_db['id'])
# TODO(Swami): is this call still valid for external agent gw port?
f_port = self.get_agent_gw_ports_exist_for_network(
context, network_id, host, l3_agent_db['id'])
if not f_port:
LOG.info(_('Agent Gateway port does not exist,'
' so create one: %s'), f_port)
agent_port = self._core_plugin.create_port(
context,
{'port': {'tenant_id': '',
'network_id': network_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
'device_id': l3_agent_db['id'],
'device_owner': DEVICE_OWNER_AGENT_GW,
'admin_state_up': True,
'name': ''}})
if agent_port:
self._populate_subnet_for_ports(context, [agent_port])
return agent_port
msg = _("Unable to create the Agent Gateway Port")
raise n_exc.BadRequest(resource='router', msg=msg)
else:
self._populate_subnet_for_ports(context, [f_port])
return f_port
def get_snat_interface_ports_for_router(self, context, router_id):
"""Return all existing snat_router_interface ports."""
filters = {'device_id': [router_id],
'device_owner': [DEVICE_OWNER_DVR_SNAT]}
return self._core_plugin.get_ports(context, filters)
def add_csnat_router_interface_port(
self, context, router_id, network_id, subnet_id, do_pop=True):
"""Add SNAT interface to the specified router and subnet."""
snat_port = self._core_plugin.create_port(
context,
{'port': {'tenant_id': '',
'network_id': network_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'fixed_ips': [{'subnet_id': subnet_id}],
'device_id': router_id,
'device_owner': DEVICE_OWNER_DVR_SNAT,
'admin_state_up': True,
'name': ''}})
if not snat_port:
msg = _("Unable to create the SNAT Interface Port")
raise n_exc.BadRequest(resource='router', msg=msg)
elif do_pop:
return self._populate_subnet_for_ports(context, [snat_port])
return snat_port
def create_snat_intf_ports_if_not_exists(
self, context, router_id):
"""Function to return the snat interface port list.
This function will return the snat interface port list
if it exists. If the port does not exist it will create
new ports and then return the list.
"""
port_list = self.get_snat_interface_ports_for_router(
context, router_id)
if port_list:
self._populate_subnet_for_ports(context, port_list)
return port_list
port_list = []
filters = {
'device_id': [router_id],
'device_owner': [DEVICE_OWNER_DVR_INTERFACE]}
int_ports = self._core_plugin.get_ports(context, filters)
LOG.info(_('SNAT interface port list does not exist,'
' so create one: %s'), port_list)
for intf in int_ports:
if intf.get('fixed_ips'):
# Passing the subnet for the port to make sure the IP's
# are assigned on the right subnet if multiple subnet
# exists
snat_port = self.add_csnat_router_interface_port(
context, router_id, intf['network_id'],
intf['fixed_ips'][0]['subnet_id'], do_pop=False)
port_list.append(snat_port)
if port_list:
self._populate_subnet_for_ports(context, port_list)
return port_list
def dvr_vmarp_table_update(self, context, port_id, action):
"""Notify the L3 agent of VM ARP table changes.
Provide the details of the VM ARP to the L3 agent when
a Nova instance gets created or deleted.
"""
port_dict = self._core_plugin._get_port(context, port_id)
# Check this is a valid VM port
if ("compute:" not in port_dict['device_owner'] or
not port_dict['fixed_ips']):
return
ip_address = port_dict['fixed_ips'][0]['ip_address']
subnet = port_dict['fixed_ips'][0]['subnet_id']
filters = {'fixed_ips': {'subnet_id': [subnet]}}
ports = self._core_plugin.get_ports(context, filters=filters)
for port in ports:
if port['device_owner'] == DEVICE_OWNER_DVR_INTERFACE:
router_id = port['device_id']
router_dict = self._get_router(context, router_id)
if router_dict.extra_attributes.distributed:
arp_table = {'ip_address': ip_address,
'mac_address': port_dict['mac_address'],
'subnet_id': subnet}
if action == "add":
notify_action = self.l3_rpc_notifier.add_arp_entry
elif action == "del":
notify_action = self.l3_rpc_notifier.del_arp_entry
notify_action(context, router_id, arp_table)
return
def delete_csnat_router_interface_ports(self, context,
router, subnet_id=None):
# Each csnat router interface port is associated
# with a subnet, so we need to pass the subnet id to
# delete the right ports.
device_filter = {
'device_id': [router['id']],
'device_owner': [DEVICE_OWNER_DVR_SNAT]}
c_snat_ports = self._core_plugin.get_ports(
context, filters=device_filter)
for p in c_snat_ports:
if subnet_id is None:
self._core_plugin.delete_port(context,
p['id'],
l3_port_check=False)
else:
if p['fixed_ips'][0]['subnet_id'] == subnet_id:
LOG.debug("Subnet matches: %s", subnet_id)
self._core_plugin.delete_port(context,
p['id'],
l3_port_check=False)
def _is_distributed_router(router):
"""Return True if router to be handled is distributed."""
try:
# See if router is a DB object first
requested_router_type = router.extra_attributes.distributed
except AttributeError:
# if not, try to see if it is a request body
requested_router_type = router.get('distributed')
if attributes.is_attr_set(requested_router_type):
return requested_router_type
return cfg.CONF.router_distributed

View File

@ -126,3 +126,56 @@ class L3RpcCallbackMixin(object):
for fip_id in fips_to_disable:
l3_plugin.update_floatingip_status(
context, fip_id, constants.FLOATINGIP_STATUS_DOWN)
def get_ports_by_subnet(self, context, **kwargs):
"""DVR: RPC called by dvr-agent to get all ports for subnet."""
subnet_id = kwargs.get('subnet_id')
LOG.debug("DVR: subnet_id: %s", subnet_id)
filters = {'fixed_ips': {'subnet_id': [subnet_id]}}
plugin = manager.NeutronManager.get_plugin()
return plugin.get_ports(context, filters=filters)
def get_agent_gateway_port(self, context, **kwargs):
"""Get Agent Gateway port for FIP.
l3 agent expects an Agent Gateway Port to be returned
for this query.
"""
network_id = kwargs.get('network_id')
host = kwargs.get('host')
admin_ctx = neutron_context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
l3plugin = manager.NeutronManager.get_service_plugins()[
plugin_constants.L3_ROUTER_NAT]
agent_port = l3plugin.create_fip_agent_gw_port_if_not_exists(
admin_ctx, network_id, host)
self._ensure_host_set_on_port(admin_ctx, plugin, host,
agent_port)
LOG.debug('Agent Gateway port returned : %(agent_port)s with '
'host %(host)s', {'agent_port': agent_port,
'host': host})
return agent_port
def get_snat_router_interface_ports(self, context, **kwargs):
"""Get SNAT serviced Router Port List.
The Service Node that hosts the SNAT service requires
the ports to service the router interfaces.
This function will check if any available ports, if not
it will create ports on the routers interfaces and
will send a list to the L3 agent.
"""
router_id = kwargs.get('router_id')
host = kwargs.get('host')
admin_ctx = neutron_context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
l3plugin = manager.NeutronManager.get_service_plugins()[
plugin_constants.L3_ROUTER_NAT]
snat_port_list = l3plugin.create_snat_intf_port_list_if_not_exists(
admin_ctx, router_id)
for p in snat_port_list:
self._ensure_host_set_on_port(admin_ctx, plugin, host, p)
LOG.debug('SNAT interface ports returned : %(snat_port_list)s '
'and on host %(host)s', {'snat_port_list': snat_port_list,
'host': host})
return snat_port_list

View File

@ -0,0 +1,59 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""L3 extension distributed mode
Revision ID: 3927f7f7c456
Revises: db_healing
Create Date: 2014-04-02 23:26:19.303633
"""
# revision identifiers, used by Alembic.
revision = '3927f7f7c456'
down_revision = 'db_healing'
migration_for_plugins = [
'*'
]
from alembic import op
import sqlalchemy as sa
from neutron.db import migration
def upgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.create_table(
'router_extra_attributes',
sa.Column('router_id', sa.String(length=36), nullable=False),
sa.Column('distributed', sa.Boolean(), nullable=False,
server_default=sa.sql.false()),
sa.ForeignKeyConstraint(
['router_id'], ['routers.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('router_id')
)
op.execute("INSERT INTO router_extra_attributes SELECT id as router_id, "
"False as distributed from routers")
def downgrade(active_plugins=None, options=None):
if not migration.should_run(active_plugins, migration_for_plugins):
return
op.drop_table('router_extra_attributes')

View File

@ -1 +1 @@
db_healing
3927f7f7c456

View File

@ -29,6 +29,7 @@ from neutron.db import extradhcpopt_db # noqa
from neutron.db import extraroute_db # noqa
from neutron.db.firewall import firewall_db # noqa
from neutron.db import l3_agentschedulers_db # noqa
from neutron.db import l3_attrs_db # noqa
from neutron.db import l3_db # noqa
from neutron.db import l3_gwmode_db # noqa
from neutron.db.loadbalancer import loadbalancer_db # noqa

67
neutron/extensions/dvr.py Normal file
View File

@ -0,0 +1,67 @@
# Copyright (c) 2014 OpenStack Foundation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.v2 import attributes
from neutron.common import constants
DISTRIBUTED = 'distributed'
EXTENDED_ATTRIBUTES_2_0 = {
'routers': {
DISTRIBUTED: {'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': attributes.ATTR_NOT_SPECIFIED,
'convert_to': attributes.convert_to_boolean_if_not_none,
'enforce_policy': True},
}
}
class Dvr(object):
"""Extension class supporting distributed virtual router."""
@classmethod
def get_name(cls):
return "Distributed Virtual Router"
@classmethod
def get_alias(cls):
return constants.L3_DISTRIBUTED_EXT_ALIAS
@classmethod
def get_description(cls):
return "Enables configuration of Distributed Virtual Routers."
@classmethod
def get_namespace(cls):
return ("http://docs.openstack.org/ext/"
"%s/api/v1.0" % constants.L3_DISTRIBUTED_EXT_ALIAS)
@classmethod
def get_updated(cls):
return "2014-06-1T10:00:00-00:00"
def get_required_extensions(self):
return ["router"]
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
return []
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
else:
return {}

View File

@ -25,6 +25,7 @@ from neutron.db import api as qdbapi
from neutron.db import common_db_mixin
from neutron.db import extraroute_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_dvr_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import model_base
@ -35,11 +36,14 @@ from neutron.plugins.common import constants
class L3RouterPluginRpcCallbacks(n_rpc.RpcCallback,
l3_rpc_base.L3RpcCallbackMixin):
RPC_API_VERSION = '1.1'
RPC_API_VERSION = '1.2'
# history
# 1.2 Added methods for DVR support
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
extraroute_db.ExtraRoute_db_mixin,
l3_dvr_db.L3_NAT_with_dvr_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin):
@ -49,9 +53,10 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin,
router and floatingip resources and manages associated
request/response.
All DB related work is implemented in classes
l3_db.L3_NAT_db_mixin and extraroute_db.ExtraRoute_db_mixin.
l3_db.L3_NAT_db_mixin, l3_dvr_db.L3_NAT_with_dvr_db_mixin, and
extraroute_db.ExtraRoute_db_mixin.
"""
supported_extension_aliases = ["router", "ext-gw-mode",
supported_extension_aliases = ["dvr", "router", "ext-gw-mode",
"extraroute", "l3_agent_scheduler"]
def __init__(self):

View File

@ -0,0 +1,136 @@
# Copyright (c) 2014 OpenStack Foundation, all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from neutron.common import constants as l3_const
from neutron import context
from neutron.db import api as db
from neutron.db import l3_dvr_db
from neutron.tests import base
class L3DvrTestCase(base.BaseTestCase):
def setUp(self):
super(L3DvrTestCase, self).setUp()
db.configure_db()
self.ctx = context.get_admin_context()
self.addCleanup(db.clear_db)
self.mixin = l3_dvr_db.L3_NAT_with_dvr_db_mixin()
def _create_router(self, router):
with self.ctx.session.begin(subtransactions=True):
return self.mixin._create_router_db(self.ctx, router, 'foo_tenant')
def _test__create_router_db(self, expected=False, distributed=None):
router = {'name': 'foo_router', 'admin_state_up': True}
if distributed is not None:
router['distributed'] = distributed
result = self._create_router(router)
self.assertEqual(expected, result.extra_attributes['distributed'])
def test_create_router_db_default(self):
self._test__create_router_db(expected=False)
def test_create_router_db_centralized(self):
self._test__create_router_db(expected=False, distributed=False)
def test_create_router_db_distributed(self):
self._test__create_router_db(expected=True, distributed=True)
def test__validate_router_migration_on_router_update(self):
router = {
'name': 'foo_router',
'admin_state_up': True,
'distributed': True
}
router_db = self._create_router(router)
self.assertIsNone(self.mixin._validate_router_migration(
router_db, {'name': 'foo_router_2'}))
def test__validate_router_migration_raise_error(self):
router = {
'name': 'foo_router',
'admin_state_up': True,
'distributed': True
}
router_db = self._create_router(router)
self.assertRaises(NotImplementedError,
self.mixin._validate_router_migration,
router_db, {'distributed': False})
def test_update_router_db_centralized_to_distributed(self):
router = {'name': 'foo_router', 'admin_state_up': True}
distributed = {'distributed': True}
router_db = self._create_router(router)
router_id = router_db['id']
self.assertFalse(router_db.extra_attributes.distributed)
with mock.patch.object(self.mixin, '_update_distributed_attr') as f:
with mock.patch.object(self.mixin, '_get_router') as g:
g.return_value = router_db
router_db = self.mixin._update_router_db(
self.ctx, router_id, distributed, mock.ANY)
# Assert that the DB value has changed
self.assertTrue(router_db.extra_attributes.distributed)
self.assertEqual(1, f.call_count)
def _test_get_device_owner(self, is_distributed=False,
expected=l3_const.DEVICE_OWNER_ROUTER_INTF,
pass_router_id=True):
router = {
'name': 'foo_router',
'admin_state_up': True,
'distributed': is_distributed
}
router_db = self._create_router(router)
router_pass = router_db['id'] if pass_router_id else router_db
with mock.patch.object(self.mixin, '_get_router') as f:
f.return_value = router_db
result = self.mixin._get_device_owner(self.ctx, router_pass)
self.assertEqual(expected, result)
def test_get_device_owner_by_router_id(self):
self._test_get_device_owner()
def test__get_device_owner_centralized(self):
self._test_get_device_owner(pass_router_id=False)
def test__get_device_owner_distributed(self):
self._test_get_device_owner(
is_distributed=True,
expected=l3_dvr_db.DEVICE_OWNER_DVR_INTERFACE,
pass_router_id=False)
def _test__is_distributed_router(self, router, expected):
result = l3_dvr_db._is_distributed_router(router)
self.assertEqual(expected, result)
def test__is_distributed_router_by_db_object(self):
router = {'name': 'foo_router', 'admin_state_up': True}
router_db = self._create_router(router)
self.mixin._get_device_owner(mock.ANY, router_db)
def test__is_distributed_router_default(self):
router = {'id': 'foo_router_id'}
self._test__is_distributed_router(router, False)
def test__is_distributed_router_centralized(self):
router = {'id': 'foo_router_id', 'distributed': False}
self._test__is_distributed_router(router, False)
def test__is_distributed_router_distributed(self):
router = {'id': 'foo_router_id', 'distributed': True}
self._test__is_distributed_router(router, True)

View File

@ -32,6 +32,7 @@ from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_db
from neutron.db import l3_dvr_db
from neutron.db import l3_rpc_base
from neutron.db import model_base
from neutron.extensions import external_net
@ -285,6 +286,7 @@ class TestNoL3NatPlugin(TestL3NatBasePlugin):
# A L3 routing service plugin class for tests with plugins that
# delegate away L3 routing functionality
class TestL3NatServicePlugin(common_db_mixin.CommonDbMixin,
l3_dvr_db.L3_NAT_with_dvr_db_mixin,
l3_db.L3_NAT_db_mixin):
supported_extension_aliases = ["router"]
@ -1223,7 +1225,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self._show('floatingips', fip['floatingip']['id'],
expected_code=exc.HTTPNotFound.code)
def _test_floatingip_with_assoc_fails(self, plugin_class):
def _test_floatingip_with_assoc_fails(self, plugin_method):
with self.subnet(cidr='200.0.0.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
with self.port() as private_port:
@ -1236,8 +1238,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self._router_interface_action('add', r['router']['id'],
private_sub['subnet']['id'],
None)
method = plugin_class + '._update_fip_assoc'
with mock.patch(method) as pl:
with mock.patch(plugin_method) as pl:
pl.side_effect = n_exc.BadRequest(
resource='floatingip',
msg='fake_error')
@ -1260,7 +1261,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
def test_floatingip_with_assoc_fails(self):
self._test_floatingip_with_assoc_fails(
'neutron.db.l3_db.L3_NAT_db_mixin')
'neutron.db.l3_db.L3_NAT_db_mixin._check_and_get_fip_assoc')
def test_floatingip_update(
self, expected_status=l3_constants.FLOATINGIP_STATUS_ACTIVE):

View File

@ -716,7 +716,8 @@ class TestL3NatTestCase(L3NatTest,
self._test_create_l3_ext_network(666)
def test_floatingip_with_assoc_fails(self):
self._test_floatingip_with_assoc_fails(self._plugin_name)
self._test_floatingip_with_assoc_fails(
"%s.%s" % (self._plugin_name, "_update_fip_assoc"))
def test_floatingip_with_invalid_create_port(self):
self._test_floatingip_with_invalid_create_port(self._plugin_name)