l2pop fdb flows for HA router ports

This patch makes L3 HA failover not depended on neutron components
(during failover).

All HA agents(active and backup) call update_device_up/down after wiring
the ports. But l2pop driver is called for only active agent as port
binding in DB reflects active agent. Then l2pop creates unicast and
multicast flows for active agent.
On failover, flows to new active agent is created. For this to happen -
all of database, messaging server, neutron-server and destination L3
agent should be active during failover. This creates two issues -
1) When any of the above resources(i.e neutron-server, .. ) are dead,
   flows between new master and other agents won't be created and
   L3 Ha failover is not working. In same scenario, L3 Ha failover will
   work if l2pop is disabled.
2) Packet loss during failover is higher as above neutron resources
   interact multiple times, so will take time to create l2 flows.

In this change, we allow plugin to notify l2pop when update_device_up/down
is called by backup agents also. Then l2pop will create flood flows to
all HA agents(both active and slave). L2pop won't create unicast flow for
this port, instead unicast flow is created by learning action of table 10
when keepalived sends GARP after assigning ip address to master router's
qr-xx port. As flood flows are already created and unicast flow is
dynamically added, L3 HA failover is not depended on l2pop.

This solves two isses
1) with L3 HA + l2pop, failover will work even if any of above agents
   or processes dead.
2) Reduce failover time as we are not depending on neutron to create
   flows during failover.
We use L3HARouterAgentPortBinding table for getting all HA agents of a
router port. HA router port on slave agent is also considered for l2pop
distributed_active_network_ports and agent_network_active_port_count

Conflicts:
        neutron/db/l3_hamode_db.py
        neutron/plugins/ml2/drivers/l2pop/db.py
        neutron/plugins/ml2/drivers/l2pop/mech_driver.py
        neutron/plugins/ml2/rpc.py
        neutron/tests/unit/plugins/ml2/drivers/l2pop/test_db.py
        neutron/tests/unit/plugins/ml2/drivers/l2pop/test_mech_driver.py
        neutron/tests/unit/plugins/ml2/test_rpc.py

Closes-bug: #1522980
Closes-bug: #1602614
Change-Id: Ie1f5289390b3ff3f7f3ed7ffc8f6a8258ee8662e
(cherry picked from commit 26d8702b9d)
This commit is contained in:
venkata anil 2016-08-04 07:14:47 +00:00
parent 664a725142
commit c06ff65dbb
8 changed files with 678 additions and 58 deletions

View File

@ -31,6 +31,7 @@ from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import utils as n_utils
from neutron.db import agents_db
from neutron.db import api as db_api
from neutron.db.availability_zone import router as router_az_db
from neutron.db import common_db_mixin
from neutron.db import l3_attrs_db
@ -759,3 +760,16 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
n_exc.PortNotFound):
# Take concurrently deleted interfaces in to account
pass
def is_ha_router_port(device_owner, router_id):
session = db_api.get_session()
if device_owner in [constants.DEVICE_OWNER_ROUTER_INTF,
constants.DEVICE_OWNER_ROUTER_SNAT]:
query = session.query(l3_attrs_db.RouterExtraAttributes)
query = query.filter_by(ha=True)
query = query.filter(l3_attrs_db.RouterExtraAttributes.router_id ==
router_id)
return bool(query.limit(1).count())
else:
return False

View File

@ -18,10 +18,15 @@ from oslo_utils import timeutils
from neutron.common import constants as const
from neutron.db import agents_db
from neutron.db import l3_hamode_db
from neutron.db import models_v2
from neutron.plugins.ml2 import models as ml2_models
HA_ROUTER_PORTS = (const.DEVICE_OWNER_ROUTER_INTF,
const.DEVICE_OWNER_ROUTER_SNAT)
def get_agent_ip_by_host(session, agent_host):
agent = get_agent_by_host(session, agent_host)
if agent:
@ -70,10 +75,28 @@ def _get_active_network_ports(session, network_id):
return query
def get_nondvr_active_network_ports(session, network_id):
def _ha_router_interfaces_on_network_query(session, network_id):
query = session.query(models_v2.Port)
query = query.join(l3_hamode_db.L3HARouterAgentPortBinding,
l3_hamode_db.L3HARouterAgentPortBinding.router_id ==
models_v2.Port.device_id)
return query.filter(
models_v2.Port.network_id == network_id,
models_v2.Port.device_owner.in_(HA_ROUTER_PORTS))
def _get_ha_router_interface_ids(session, network_id):
query = _ha_router_interfaces_on_network_query(session, network_id)
return query.from_self(models_v2.Port.id).distinct()
def get_nondistributed_active_network_ports(session, network_id):
query = _get_active_network_ports(session, network_id)
# Exclude DVR and HA router interfaces
query = query.filter(models_v2.Port.device_owner !=
const.DEVICE_OWNER_DVR_INTERFACE)
ha_iface_ids_query = _get_ha_router_interface_ids(session, network_id)
query = query.filter(models_v2.Port.id.notin_(ha_iface_ids_query))
return [(bind, agent) for bind, agent in query.all()
if get_agent_ip(agent)]
@ -93,6 +116,44 @@ def get_dvr_active_network_ports(session, network_id):
if get_agent_ip(agent)]
def get_distributed_active_network_ports(session, network_id):
return (get_dvr_active_network_ports(session, network_id) +
get_ha_active_network_ports(session, network_id))
def get_ha_active_network_ports(session, network_id):
agents = get_ha_agents(session, network_id=network_id)
return [(None, agent) for agent in agents]
def get_ha_agents(session, network_id=None, router_id=None):
query = session.query(agents_db.Agent.host).distinct()
query = query.join(l3_hamode_db.L3HARouterAgentPortBinding,
l3_hamode_db.L3HARouterAgentPortBinding.l3_agent_id ==
agents_db.Agent.id)
if router_id:
query = query.filter(
l3_hamode_db.L3HARouterAgentPortBinding.router_id == router_id)
elif network_id:
query = query.join(models_v2.Port, models_v2.Port.device_id ==
l3_hamode_db.L3HARouterAgentPortBinding.router_id)
query = query.filter(models_v2.Port.network_id == network_id,
models_v2.Port.status == const.PORT_STATUS_ACTIVE,
models_v2.Port.device_owner.in_(HA_ROUTER_PORTS))
else:
return []
# L3HARouterAgentPortBinding will have l3 agent ids of hosting agents.
# But we need l2 agent(for tunneling ip) while creating FDB entries.
agents_query = session.query(agents_db.Agent)
agents_query = agents_query.filter(agents_db.Agent.host.in_(query))
return [agent for agent in agents_query
if get_agent_ip(agent)]
def get_ha_agents_by_router_id(session, router_id):
return get_ha_agents(session, router_id=router_id)
def get_agent_network_active_port_count(session, agent_host,
network_id):
with session.begin(subtransactions=True):
@ -104,11 +165,27 @@ def get_agent_network_active_port_count(session, agent_host,
models_v2.Port.device_owner !=
const.DEVICE_OWNER_DVR_INTERFACE,
ml2_models.PortBinding.host == agent_host)
ha_iface_ids_query = _get_ha_router_interface_ids(session, network_id)
query1 = query1.filter(models_v2.Port.id.notin_(ha_iface_ids_query))
ha_port_count = get_ha_router_active_port_count(
session, agent_host, network_id)
query2 = query.join(ml2_models.DVRPortBinding)
query2 = query2.filter(models_v2.Port.network_id == network_id,
ml2_models.DVRPortBinding.status ==
const.PORT_STATUS_ACTIVE,
models_v2.Port.device_owner ==
const.DEVICE_OWNER_DVR_INTERFACE,
ml2_models.DVRPortBinding.host == agent_host)
return (query1.count() + query2.count())
ml2_models.DVRPortBinding.host ==
agent_host)
return (query1.count() + query2.count() + ha_port_count)
def get_ha_router_active_port_count(session, agent_host, network_id):
# Return num of HA router interfaces on the given network and host
query = _ha_router_interfaces_on_network_query(session, network_id)
query = query.filter(models_v2.Port.status == const.PORT_STATUS_ACTIVE)
query = query.join(agents_db.Agent)
query = query.filter(agents_db.Agent.host == agent_host)
return query.count()

View File

@ -20,6 +20,9 @@ from neutron._i18n import _LW
from neutron.common import constants as const
from neutron import context as n_context
from neutron.db import api as db_api
from neutron.db import l3_hamode_db
from neutron import manager
from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers.l2pop import config # noqa
@ -48,11 +51,30 @@ class L2populationMechanismDriver(api.MechanismDriver):
ip_address=ip['ip_address'])
for ip in port['fixed_ips']]
def _get_ha_port_agents_fdb(
self, session, network_id, router_id):
other_fdb_ports = {}
for agent in l2pop_db.get_ha_agents_by_router_id(session, router_id):
agent_active_ports = l2pop_db.get_agent_network_active_port_count(
session, agent.host, network_id)
if agent_active_ports == 0:
ip = l2pop_db.get_agent_ip(agent)
other_fdb_ports[ip] = [const.FLOODING_ENTRY]
return other_fdb_ports
def delete_port_postcommit(self, context):
port = context.current
agent_host = context.host
fdb_entries = self._get_agent_fdb(context.bottom_bound_segment,
port, agent_host)
session = db_api.get_session()
if port['device_owner'] in l2pop_db.HA_ROUTER_PORTS:
network_id = port['network_id']
other_fdb_ports = self._get_ha_port_agents_fdb(
session, network_id, port['device_id'])
fdb_entries[network_id]['ports'] = other_fdb_ports
self.L2populationAgentNotify.remove_fdb_entries(self.rpc_ctx,
fdb_entries)
@ -115,13 +137,15 @@ class L2populationMechanismDriver(api.MechanismDriver):
def update_port_postcommit(self, context):
port = context.current
orig = context.original
if l3_hamode_db.is_ha_router_port(port['device_owner'],
port['device_id']):
return
diff_ips = self._get_diff_ips(orig, port)
if diff_ips:
self._fixed_ips_changed(context, orig, port, diff_ips)
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
if context.status == const.PORT_STATUS_ACTIVE:
self._update_port_up(context)
self.update_port_up(context)
if context.status == const.PORT_STATUS_DOWN:
agent_host = context.host
fdb_entries = self._get_agent_fdb(
@ -140,7 +164,7 @@ class L2populationMechanismDriver(api.MechanismDriver):
self.rpc_ctx, fdb_entries)
elif context.status != context.original_status:
if context.status == const.PORT_STATUS_ACTIVE:
self._update_port_up(context)
self.update_port_up(context)
elif context.status == const.PORT_STATUS_DOWN:
fdb_entries = self._get_agent_fdb(
context.bottom_bound_segment, port, context.host)
@ -167,9 +191,10 @@ class L2populationMechanismDriver(api.MechanismDriver):
'network_type': segment['network_type'],
'ports': {}}}
tunnel_network_ports = (
l2pop_db.get_dvr_active_network_ports(session, network_id))
l2pop_db.get_distributed_active_network_ports(session, network_id))
fdb_network_ports = (
l2pop_db.get_nondvr_active_network_ports(session, network_id))
l2pop_db.get_nondistributed_active_network_ports(
session, network_id))
ports = agent_fdb_entries[network_id]['ports']
ports.update(self._get_tunnels(
fdb_network_ports + tunnel_network_ports,
@ -198,7 +223,24 @@ class L2populationMechanismDriver(api.MechanismDriver):
return agents
def _update_port_up(self, context):
def update_port_down(self, context):
port = context.current
agent_host = context.host
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
# when agent transitions to backup, don't remove flood flows
if agent_host and l3plugin and getattr(
l3plugin, "list_router_ids_on_host", None):
admin_context = n_context.get_admin_context()
if l3plugin.list_router_ids_on_host(
admin_context, agent_host, [port['device_id']]):
return
fdb_entries = self._get_agent_fdb(
context.bottom_bound_segment, port, agent_host)
self.L2populationAgentNotify.remove_fdb_entries(
self.rpc_ctx, fdb_entries)
def update_port_up(self, context):
port = context.current
agent_host = context.host
session = db_api.get_session()
@ -238,7 +280,9 @@ class L2populationMechanismDriver(api.MechanismDriver):
self.rpc_ctx, agent_fdb_entries, agent_host)
# Notify other agents to add fdb rule for current port
if port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE:
if (port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE and
not l3_hamode_db.is_ha_router_port(port['device_owner'],
port['device_id'])):
other_fdb_ports[agent_ip] += self._get_port_fdb_entries(port)
self.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
@ -267,7 +311,9 @@ class L2populationMechanismDriver(api.MechanismDriver):
other_fdb_entries[network_id]['ports'][agent_ip].append(
const.FLOODING_ENTRY)
# Notify other agents to remove fdb rules for current port
if port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE:
if (port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE and
not l3_hamode_db.is_ha_router_port(port['device_owner'],
port['device_id'])):
fdb_entries = self._get_port_fdb_entries(port)
other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries

View File

@ -27,9 +27,11 @@ from neutron.common import constants as n_const
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import l3_hamode_db
from neutron.extensions import portbindings
from neutron.extensions import portsecurity as psec
from neutron import manager
from neutron.plugins.ml2 import db as ml2_db
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
from neutron.services.qos import qos_consts
@ -182,16 +184,18 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
LOG.debug("Device %(device)s not bound to the"
" agent host %(host)s",
{'device': device, 'host': host})
return {'device': device,
'exists': port_exists}
try:
port_exists = bool(plugin.update_port_status(
rpc_context, port_id, n_const.PORT_STATUS_DOWN, host))
except exc.StaleDataError:
port_exists = False
LOG.debug("delete_port and update_device_down are being executed "
"concurrently. Ignoring StaleDataError.")
else:
try:
port_exists = bool(plugin.update_port_status(
rpc_context, port_id, n_const.PORT_STATUS_DOWN, host))
except exc.StaleDataError:
port_exists = False
LOG.debug("delete_port and update_device_down are being "
"executed concurrently. Ignoring StaleDataError.")
return {'device': device,
'exists': port_exists}
self.notify_ha_port_status(port_id, rpc_context,
n_const.PORT_STATUS_DOWN, host)
return {'device': device,
'exists': port_exists}
@ -210,8 +214,13 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
LOG.debug("Device %(device)s not bound to the"
" agent host %(host)s",
{'device': device, 'host': host})
return
else:
self.update_port_status_to_active(rpc_context, port_id, host)
self.notify_ha_port_status(port_id, rpc_context,
n_const.PORT_STATUS_ACTIVE, host)
def update_port_status_to_active(self, rpc_context, port_id, host):
plugin = manager.NeutronManager.get_plugin()
port_id = plugin.update_port_status(rpc_context, port_id,
n_const.PORT_STATUS_ACTIVE,
host)
@ -231,6 +240,28 @@ class RpcCallbacks(type_tunnel.TunnelRpcCallbackMixin):
registry.notify(
resources.PORT, events.AFTER_UPDATE, plugin, **kwargs)
def notify_ha_port_status(self, port_id, rpc_context,
status, host):
plugin = manager.NeutronManager.get_plugin()
l2pop_driver = plugin.mechanism_manager.mech_drivers.get(
'l2population')
if not l2pop_driver:
return
port = ml2_db.get_port(rpc_context.session, port_id)
if not port:
return
is_ha_port = l3_hamode_db.is_ha_router_port(port['device_owner'],
port['device_id'])
if is_ha_port:
port_context = plugin.get_bound_port_context(
rpc_context, port_id)
port_context.current['status'] = status
port_context.current[portbindings.HOST_ID] = host
if status == n_const.PORT_STATUS_ACTIVE:
l2pop_driver.obj.update_port_up(port_context)
else:
l2pop_driver.obj.update_port_down(port_context)
def update_device_list(self, rpc_context, **kwargs):
devices_up = []
failed_devices_up = []

View File

@ -1097,6 +1097,39 @@ class L3HAModeDbTestCase(L3HATestFramework):
port = self._get_first_interface(router['id'])
self.assertEqual(self.agent1['host'], port[portbindings.HOST_ID])
def test_is_ha_router_port(self):
network_id = self._create_network(self.core_plugin, self.admin_ctx)
subnet = self._create_subnet(self.core_plugin, self.admin_ctx,
network_id)
interface_info = {'subnet_id': subnet['id']}
router = self._create_router()
self.plugin.add_router_interface(self.admin_ctx,
router['id'],
interface_info)
port = self._get_first_interface(router['id'])
self.assertTrue(l3_hamode_db.is_ha_router_port(
port['device_owner'], port['device_id']))
def test_is_ha_router_port_for_normal_port(self):
network_id = self._create_network(self.core_plugin, self.admin_ctx)
subnet = self._create_subnet(self.core_plugin, self.admin_ctx,
network_id)
interface_info = {'subnet_id': subnet['id']}
router = self._create_router(ha=False)
self.plugin.add_router_interface(self.admin_ctx,
router['id'],
interface_info)
device_filter = {'device_id': [router['id']],
'device_owner':
[constants.DEVICE_OWNER_ROUTER_INTF]}
port = self.core_plugin.get_ports(
self.admin_ctx, filters=device_filter)[0]
self.assertFalse(l3_hamode_db.is_ha_router_port(
port['device_owner'], port['device_id']))
class L3HAUserTestCase(L3HATestFramework):

View File

@ -13,22 +13,73 @@
# under the License.
from neutron.common import constants
from oslo_utils import uuidutils
from neutron.common import constants as n_const
from neutron.common import utils
from neutron import context
from neutron.db import l3_attrs_db
from neutron.db import l3_db
from neutron.db import l3_hamode_db
from neutron.db import models_v2
from neutron.extensions import portbindings
from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db
from neutron.plugins.ml2 import models
from neutron.tests.common import helpers
from neutron.tests import tools
from neutron.tests.unit import testlib_api
HOST = helpers.HOST
HOST_2 = 'HOST_2'
HOST_3 = 'HOST_3'
HOST_2_TUNNELING_IP = '20.0.0.2'
HOST_3_TUNNELING_IP = '20.0.0.3'
TEST_ROUTER_ID = 'router_id'
TEST_NETWORK_ID = 'network_id'
TEST_HA_NETWORK_ID = 'ha_network_id'
class TestL2PopulationDBTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(TestL2PopulationDBTestCase, self).setUp()
self.ctx = context.get_admin_context()
self._create_network()
def _create_network(self, network_id=TEST_NETWORK_ID):
with self.ctx.session.begin(subtransactions=True):
self.ctx.session.add(models_v2.Network(id=network_id))
def _create_router(self, distributed=True, ha=False):
with self.ctx.session.begin(subtransactions=True):
self.ctx.session.add(l3_db.Router(id=TEST_ROUTER_ID))
self.ctx.session.add(l3_attrs_db.RouterExtraAttributes(
router_id=TEST_ROUTER_ID, distributed=distributed, ha=ha))
def _create_ha_router(self, distributed=False):
helpers.register_l3_agent(HOST_2)
helpers.register_ovs_agent(HOST_2, tunneling_ip=HOST_2_TUNNELING_IP)
# Register l3 agent on host3, which doesn't host any HA router.
# Tests should test that host3 is not a HA agent host.
helpers.register_l3_agent(HOST_3)
helpers.register_ovs_agent(HOST_3, tunneling_ip=HOST_3_TUNNELING_IP)
with self.ctx.session.begin(subtransactions=True):
self.ctx.session.add(models_v2.Network(id=TEST_HA_NETWORK_ID))
self._create_router(distributed=distributed, ha=True)
for state, host in [(n_const.HA_ROUTER_STATE_ACTIVE, HOST),
(n_const.HA_ROUTER_STATE_STANDBY, HOST_2)]:
self._setup_port_binding(
network_id=TEST_HA_NETWORK_ID,
device_owner=constants.DEVICE_OWNER_ROUTER_HA_INTF,
device_id=TEST_ROUTER_ID,
host_state=state,
host=host)
def get_l3_agent_by_host(self, agent_host):
plugin = helpers.FakePlugin()
return plugin._get_agent_by_type_and_host(
self.ctx, constants.AGENT_TYPE_L3, agent_host)
def test_get_agent_by_host(self):
# Register a L2 agent + A bunch of other agents on the same host
helpers.register_l3_agent()
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
@ -37,58 +88,70 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase):
self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type)
def test_get_agent_by_host_no_candidate(self):
# Register a bunch of non-L2 agents on the same host
helpers.register_l3_agent()
helpers.register_dhcp_agent()
agent = l2pop_db.get_agent_by_host(
self.ctx.session, helpers.HOST)
self.assertIsNone(agent)
def _setup_port_binding(self, network_id='network_id', dvr=True):
def _setup_port_binding(self, **kwargs):
with self.ctx.session.begin(subtransactions=True):
self.ctx.session.add(models_v2.Network(id=network_id))
device_owner = constants.DEVICE_OWNER_DVR_INTERFACE if dvr else ''
mac = utils.get_random_mac('fa:16:3e:00:00:00'.split(':'))
port_id = uuidutils.generate_uuid()
network_id = kwargs.get('network_id', TEST_NETWORK_ID)
device_owner = kwargs.get('device_owner', '')
device_id = kwargs.get('device_id', '')
host = kwargs.get('host', helpers.HOST)
self.ctx.session.add(models_v2.Port(
id='port_id',
network_id=network_id,
mac_address='00:11:22:33:44:55',
admin_state_up=True,
status=constants.PORT_STATUS_ACTIVE,
device_id='',
device_owner=device_owner))
port_binding_cls = (models.DVRPortBinding if dvr
else models.PortBinding)
binding_kwarg = {
'port_id': 'port_id',
'host': helpers.HOST,
'vif_type': portbindings.VIF_TYPE_UNBOUND,
'vnic_type': portbindings.VNIC_NORMAL
}
if dvr:
binding_kwarg['router_id'] = 'router_id'
id=port_id, network_id=network_id, mac_address=mac,
admin_state_up=True, status=constants.PORT_STATUS_ACTIVE,
device_id=device_id, device_owner=device_owner))
port_binding_cls = models.PortBinding
binding_kwarg = {'port_id': port_id,
'host': host,
'vif_type': portbindings.VIF_TYPE_UNBOUND,
'vnic_type': portbindings.VNIC_NORMAL}
if device_owner == constants.DEVICE_OWNER_DVR_INTERFACE:
port_binding_cls = models.DVRPortBinding
binding_kwarg['router_id'] = TEST_ROUTER_ID
binding_kwarg['status'] = constants.PORT_STATUS_DOWN
self.ctx.session.add(port_binding_cls(**binding_kwarg))
if network_id == TEST_HA_NETWORK_ID:
agent = self.get_l3_agent_by_host(host)
haport_bindings_cls = l3_hamode_db.L3HARouterAgentPortBinding
habinding_kwarg = {'port_id': port_id,
'router_id': device_id,
'l3_agent_id': agent['id'],
'state': kwargs.get('host_state',
n_const.HA_ROUTER_STATE_ACTIVE)}
self.ctx.session.add(haport_bindings_cls(**habinding_kwarg))
def test_get_dvr_active_network_ports(self):
self._setup_port_binding()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_DVR_INTERFACE)
# Register a L2 agent + A bunch of other agents on the same host
helpers.register_l3_agent()
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
tunnel_network_ports = l2pop_db.get_dvr_active_network_ports(
self.ctx.session, 'network_id')
tunnel_network_ports = l2pop_db.get_distributed_active_network_ports(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(1, len(tunnel_network_ports))
_, agent = tunnel_network_ports[0]
self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type)
def test_get_dvr_active_network_ports_no_candidate(self):
self._setup_port_binding()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_DVR_INTERFACE)
# Register a bunch of non-L2 agents on the same host
helpers.register_l3_agent()
helpers.register_dhcp_agent()
tunnel_network_ports = l2pop_db.get_dvr_active_network_ports(
self.ctx.session, 'network_id')
tunnel_network_ports = l2pop_db.get_distributed_active_network_ports(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(0, len(tunnel_network_ports))
def test_get_nondvr_active_network_ports(self):
@ -97,8 +160,8 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase):
helpers.register_l3_agent()
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
fdb_network_ports = l2pop_db.get_nondvr_active_network_ports(
self.ctx.session, 'network_id')
fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(1, len(fdb_network_ports))
_, agent = fdb_network_ports[0]
self.assertEqual(constants.AGENT_TYPE_OVS, agent.agent_type)
@ -108,6 +171,117 @@ class TestL2PopulationDBTestCase(testlib_api.SqlTestCase):
# Register a bunch of non-L2 agents on the same host
helpers.register_l3_agent()
helpers.register_dhcp_agent()
fdb_network_ports = l2pop_db.get_nondvr_active_network_ports(
self.ctx.session, 'network_id')
fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(0, len(fdb_network_ports))
def test__get_ha_router_interface_ids_with_ha_dvr_snat_port(self):
helpers.register_dhcp_agent()
helpers.register_l3_agent()
helpers.register_ovs_agent()
self._create_ha_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
ha_iface_ids = l2pop_db._get_ha_router_interface_ids(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(1, len(list(ha_iface_ids)))
def test__get_ha_router_interface_ids_with_ha_replicated_port(self):
helpers.register_dhcp_agent()
helpers.register_l3_agent()
helpers.register_ovs_agent()
self._create_ha_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_INTF,
device_id=TEST_ROUTER_ID)
ha_iface_ids = l2pop_db._get_ha_router_interface_ids(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(1, len(list(ha_iface_ids)))
def test__get_ha_router_interface_ids_with_no_ha_port(self):
self._create_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
ha_iface_ids = l2pop_db._get_ha_router_interface_ids(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(0, len(list(ha_iface_ids)))
def test_active_network_ports_with_dvr_snat_port(self):
# Test to get agent hosting dvr snat port
helpers.register_l3_agent()
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
# create DVR router
self._create_router()
# setup DVR snat port
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
helpers.register_dhcp_agent()
fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(1, len(fdb_network_ports))
def test_active_network_ports_with_ha_dvr_snat_port(self):
# test to get HA agents hosting HA+DVR snat port
helpers.register_dhcp_agent()
helpers.register_l3_agent()
helpers.register_ovs_agent()
# create HA+DVR router
self._create_ha_router()
# setup HA snat port
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
fdb_network_ports = l2pop_db.get_nondistributed_active_network_ports(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(0, len(fdb_network_ports))
ha_ports = l2pop_db.get_ha_active_network_ports(
self.ctx.session, TEST_NETWORK_ID)
self.assertEqual(2, len(ha_ports))
def test_active_port_count_with_dvr_snat_port(self):
helpers.register_l3_agent()
helpers.register_dhcp_agent()
helpers.register_ovs_agent()
self._create_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
helpers.register_dhcp_agent()
port_count = l2pop_db.get_agent_network_active_port_count(
self.ctx.session, HOST, TEST_NETWORK_ID)
self.assertEqual(1, port_count)
port_count = l2pop_db.get_agent_network_active_port_count(
self.ctx.session, HOST_2, TEST_NETWORK_ID)
self.assertEqual(0, port_count)
def test_active_port_count_with_ha_dvr_snat_port(self):
helpers.register_dhcp_agent()
helpers.register_l3_agent()
helpers.register_ovs_agent()
self._create_ha_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
port_count = l2pop_db.get_agent_network_active_port_count(
self.ctx.session, HOST, TEST_NETWORK_ID)
self.assertEqual(1, port_count)
port_count = l2pop_db.get_agent_network_active_port_count(
self.ctx.session, HOST_2, TEST_NETWORK_ID)
self.assertEqual(1, port_count)
def test_get_ha_agents_by_router_id(self):
helpers.register_dhcp_agent()
helpers.register_l3_agent()
helpers.register_ovs_agent()
self._create_ha_router()
self._setup_port_binding(
device_owner=constants.DEVICE_OWNER_ROUTER_SNAT,
device_id=TEST_ROUTER_ID)
agents = l2pop_db.get_ha_agents_by_router_id(
self.ctx.session, TEST_ROUTER_ID)
ha_agents = [agent.host for agent in agents]
self.assertEqual(tools.UnorderedList([HOST, HOST_2]), ha_agents)

View File

@ -17,12 +17,18 @@ import mock
from oslo_serialization import jsonutils
import testtools
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import topics
from neutron import context
from neutron.db import agents_db
from neutron.db import common_db_mixin
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_hamode_db
from neutron.extensions import portbindings
from neutron.extensions import providernet as pnet
from neutron import manager
from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import driver_context
from neutron.plugins.ml2.drivers.l2pop import db as l2pop_db
@ -31,6 +37,7 @@ from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import rpc
from neutron.scheduler import l3_agent_scheduler
from neutron.tests import base
from neutron.tests.common import helpers
from neutron.tests.unit.plugins.ml2 import test_plugin
@ -40,12 +47,20 @@ HOST_2 = HOST + '_2'
HOST_3 = HOST + '_3'
HOST_4 = HOST + '_4'
HOST_5 = HOST + '_5'
TEST_ROUTER_ID = 'router_id'
NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
DEVICE_OWNER_COMPUTE = constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
class FakeL3PluginWithAgents(common_db_mixin.CommonDbMixin,
l3_hamode_db.L3_HA_NAT_db_mixin,
l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agents_db.AgentDbMixin):
pass
class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
_mechanism_drivers = ['openvswitch', 'fake_agent', 'l2population']
@ -101,6 +116,18 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
uptime_patch = mock.patch(uptime, return_value=190)
uptime_patch.start()
def _setup_l3(self):
notif_p = mock.patch.object(l3_hamode_db.L3_HA_NAT_db_mixin,
'_notify_ha_interfaces_updated')
self.notif_m = notif_p.start()
self.plugin = FakeL3PluginWithAgents()
self._register_ml2_agents()
self._register_l3_agents()
def _register_l3_agents(self):
self.agent1 = helpers.register_l3_agent(host=HOST)
self.agent2 = helpers.register_l3_agent(host=HOST_2)
def _register_ml2_agents(self):
helpers.register_ovs_agent(host=HOST, tunneling_ip='20.0.0.1')
helpers.register_ovs_agent(host=HOST_2, tunneling_ip='20.0.0.2')
@ -167,6 +194,216 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
result = jsonutils.loads(jsonutils.dumps(payload))
self.assertEqual(entry, result['netuuid']['ports']['1'][0])
def _create_router(self, ha=True, tenant_id='tenant1',
distributed=None, ctx=None):
if ctx is None:
ctx = self.adminContext
ctx.tenant_id = tenant_id
router = {'name': TEST_ROUTER_ID, 'admin_state_up': True,
'tenant_id': ctx.tenant_id}
if ha is not None:
router['ha'] = ha
if distributed is not None:
router['distributed'] = distributed
return self.plugin.create_router(ctx, {'router': router})
def _bind_router(self, router_id):
with self.adminContext.session.begin(subtransactions=True):
scheduler = l3_agent_scheduler.ChanceScheduler()
filters = {'agent_type': [constants.AGENT_TYPE_L3]}
agents_db = self.plugin.get_agents_db(self.adminContext,
filters=filters)
scheduler._bind_ha_router_to_agents(
self.plugin,
self.adminContext,
router_id,
agents_db)
self._bind_ha_network_ports(router_id)
def _bind_ha_network_ports(self, router_id):
port_bindings = self.plugin.get_ha_router_port_bindings(
self.adminContext, [router_id])
plugin = manager.NeutronManager.get_plugin()
for port_binding in port_bindings:
filters = {'id': [port_binding.port_id]}
port = plugin.get_ports(self.adminContext, filters=filters)[0]
if port_binding.l3_agent_id == self.agent1['id']:
port[portbindings.HOST_ID] = self.agent1['host']
else:
port[portbindings.HOST_ID] = self.agent2['host']
plugin.update_port(self.adminContext, port['id'],
{attributes.PORT: port})
def _get_first_interface(self, net_id, router_id):
plugin = manager.NeutronManager.get_plugin()
device_filter = {'device_id': [router_id],
'device_owner':
[constants.DEVICE_OWNER_ROUTER_INTF]}
return plugin.get_ports(self.adminContext, filters=device_filter)[0]
def _add_router_interface(self, subnet, router, host):
interface_info = {'subnet_id': subnet['id']}
self.plugin.add_router_interface(self.adminContext,
router['id'], interface_info)
self.plugin.update_routers_states(
self.adminContext,
{router['id']: constants.HA_ROUTER_STATE_ACTIVE}, host)
port = self._get_first_interface(subnet['network_id'], router['id'])
self.mock_cast.reset_mock()
self.mock_fanout.reset_mock()
self.callbacks.update_device_up(self.adminContext, agent_id=host,
device=port['id'], host=host)
return port
def _create_ha_router(self):
self._setup_l3()
router = self._create_router()
self._bind_router(router['id'])
return router
def _verify_remove_fdb(self, expected, agent_id, device, host=None):
self.mock_fanout.reset_mock()
self.callbacks.update_device_down(self.adminContext, agent_id=host,
device=device, host=host)
self.mock_fanout.assert_called_with(
mock.ANY, 'remove_fdb_entries', expected)
def test_other_agents_get_flood_entries_for_ha_agents(self):
# First HA router port is added on HOST and HOST2, then network port
# is added on HOST4.
# HOST4 should get flood entries for HOST1 and HOST2
router = self._create_ha_router()
service_plugins = manager.NeutronManager.get_service_plugins()
service_plugins[service_constants.L3_ROUTER_NAT] = self.plugin
with self.subnet(network=self._network, enable_dhcp=False) as snet, \
mock.patch('neutron.manager.NeutronManager.get_service_plugins',
return_value=service_plugins):
subnet = snet['subnet']
port = self._add_router_interface(subnet, router, HOST)
host_arg = {portbindings.HOST_ID: HOST_4, 'admin_state_up': True}
with self.port(subnet=snet,
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
p1 = port1['port']
device1 = 'tap' + p1['id']
self.mock_cast.reset_mock()
self.mock_fanout.reset_mock()
self.callbacks.update_device_up(
self.adminContext, agent_id=HOST_4, device=device1)
cast_expected = {
port['network_id']: {
'ports': {'20.0.0.1': [constants.FLOODING_ENTRY],
'20.0.0.2': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
self.assertEqual(1, self.mock_cast.call_count)
self.mock_cast.assert_called_with(
mock.ANY, 'add_fdb_entries', cast_expected, HOST_4)
def test_delete_ha_port(self):
# First network port is added on HOST, and then HA router port
# is added on HOST and HOST2.
# Remove_fdb should carry flood entry of only HOST2 and not HOST
router = self._create_ha_router()
service_plugins = manager.NeutronManager.get_service_plugins()
service_plugins[service_constants.L3_ROUTER_NAT] = self.plugin
with self.subnet(network=self._network, enable_dhcp=False) as snet, \
mock.patch('neutron.manager.NeutronManager.get_service_plugins',
return_value=service_plugins):
host_arg = {portbindings.HOST_ID: HOST, 'admin_state_up': True}
with self.port(subnet=snet,
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
p1 = port1['port']
device1 = 'tap' + p1['id']
self.callbacks.update_device_up(self.adminContext,
agent_id=HOST, device=device1)
subnet = snet['subnet']
port = self._add_router_interface(subnet, router, HOST)
expected = {port['network_id']:
{'ports': {'20.0.0.2': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
self.mock_fanout.reset_mock()
interface_info = {'subnet_id': subnet['id']}
self.plugin.remove_router_interface(self.adminContext,
router['id'], interface_info)
self.mock_fanout.assert_called_with(
mock.ANY, 'remove_fdb_entries', expected)
def test_ha_agents_get_other_fdb(self):
# First network port is added on HOST4, then HA router port is
# added on HOST and HOST2.
# Both HA agents should create tunnels to HOST4 and among themselves.
# Both HA agents should be notified to other agents.
router = self._create_ha_router()
service_plugins = manager.NeutronManager.get_service_plugins()
service_plugins[service_constants.L3_ROUTER_NAT] = self.plugin
with self.subnet(network=self._network, enable_dhcp=False) as snet, \
mock.patch('neutron.manager.NeutronManager.get_service_plugins',
return_value=service_plugins):
host_arg = {portbindings.HOST_ID: HOST_4, 'admin_state_up': True}
with self.port(subnet=snet,
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
p1 = port1['port']
device1 = 'tap' + p1['id']
self.callbacks.update_device_up(
self.adminContext, agent_id=HOST_4, device=device1)
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
subnet = snet['subnet']
port = self._add_router_interface(subnet, router, HOST)
fanout_expected = {port['network_id']: {
'ports': {'20.0.0.1': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
cast_expected_host = {port['network_id']: {
'ports': {
'20.0.0.4': [constants.FLOODING_ENTRY,
l2pop_rpc.PortInfo(p1['mac_address'],
p1_ips[0])],
'20.0.0.2': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
self.mock_cast.assert_called_with(
mock.ANY, 'add_fdb_entries', cast_expected_host, HOST)
self.mock_fanout.assert_called_with(
mock.ANY, 'add_fdb_entries', fanout_expected)
self.mock_cast.reset_mock()
self.mock_fanout.reset_mock()
self.callbacks.update_device_up(
self.adminContext, agent_id=HOST_2,
device=port['id'], host=HOST_2)
cast_expected_host2 = {port['network_id']: {
'ports': {
'20.0.0.4': [constants.FLOODING_ENTRY,
l2pop_rpc.PortInfo(p1['mac_address'],
p1_ips[0])],
'20.0.0.1': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
fanout_expected = {port['network_id']: {
'ports': {'20.0.0.2': [constants.FLOODING_ENTRY]},
'network_type': 'vxlan', 'segment_id': 1}}
self.mock_cast.assert_called_with(
mock.ANY, 'add_fdb_entries', cast_expected_host2, HOST_2)
self.mock_fanout.assert_called_with(
mock.ANY, 'add_fdb_entries', fanout_expected)
def test_fdb_add_called(self):
self._register_ml2_agents()
@ -839,12 +1076,15 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
l2pop_mech = l2pop_mech_driver.L2populationMechanismDriver()
l2pop_mech.L2PopulationAgentNotify = mock.Mock()
l2pop_mech.rpc_ctx = mock.Mock()
port = {'device_owner': ''}
context = mock.Mock()
context.current = port
with mock.patch.object(l2pop_mech,
'_get_agent_fdb',
return_value=None) as upd_port_down,\
mock.patch.object(l2pop_mech.L2PopulationAgentNotify,
'remove_fdb_entries'):
l2pop_mech.delete_port_postcommit(mock.Mock())
l2pop_mech.delete_port_postcommit(context)
self.assertTrue(upd_port_down.called)
def test_delete_unbound_port(self):
@ -916,9 +1156,11 @@ class TestL2PopulationMechDriver(base.BaseTestCase):
with mock.patch.object(l2pop_db, 'get_agent_ip',
side_effect=agent_ip_side_effect),\
mock.patch.object(l2pop_db, 'get_nondvr_active_network_ports',
mock.patch.object(l2pop_db,
'get_nondistributed_active_network_ports',
return_value=fdb_network_ports),\
mock.patch.object(l2pop_db, 'get_dvr_active_network_ports',
mock.patch.object(l2pop_db,
'get_distributed_active_network_ports',
return_value=tunnel_network_ports):
session = mock.Mock()
agent = mock.Mock()

View File

@ -58,7 +58,8 @@ class RpcCallbacksTestCase(base.BaseTestCase):
}
with mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin'
'._device_to_port_id'):
with mock.patch('neutron.callbacks.registry.notify') as notify:
with mock.patch('neutron.callbacks.registry.notify') as notify,\
mock.patch.object(self.callbacks, 'notify_ha_port_status'):
self.callbacks.update_device_up(mock.Mock(), **kwargs)
return notify
@ -213,6 +214,7 @@ class RpcCallbacksTestCase(base.BaseTestCase):
def _test_update_device_not_bound_to_host(self, func):
self.plugin.port_bound_to_host.return_value = False
self.callbacks.notify_ha_port_status = mock.Mock()
self.plugin._device_to_port_id.return_value = 'fake_port_id'
res = func(mock.Mock(), device='fake_device', host='fake_host')
self.plugin.port_bound_to_host.assert_called_once_with(mock.ANY,
@ -232,6 +234,7 @@ class RpcCallbacksTestCase(base.BaseTestCase):
def test_update_device_down_call_update_port_status(self):
self.plugin.update_port_status.return_value = False
self.callbacks.notify_ha_port_status = mock.Mock()
self.plugin._device_to_port_id.return_value = 'fake_port_id'
self.assertEqual(
{'device': 'fake_device', 'exists': False},