Make L3RpcCallback a separate callback class

RPC has a version of itself. In Neutron a plugin implements
several RPC interface, so a single RPC version doesn't work.
In Mixin callback class approach, RPC versioning depends on
each plugin implementation and it makes harder to maintain
RPC version appropriately. This patch series replaces mixin
RPC callback of server side with a separate class.

This commit handles server-side callback of L3-agent RPC interface.
L3-agent server-side callback class is moved from db/ to
api/rpc/handlers because it doesn't involve any db operations
and defining all RPC interfaces in a single place sounds reasonable.

Note that moving other L3-agent related RPC interface class
to api/rpc/handlers will be done in a separate patch as this patch
focuses on reorganizing the server-side RPC callback class.

Partial-Bug: #1359416
Change-Id: Ie3f2c9b2ad907a1110e05fe94d42e41e93fbcaa7
This commit is contained in:
Akihiro Motoki 2014-08-19 03:49:30 +09:00
parent 3f16a69544
commit 73719a80bf
17 changed files with 99 additions and 112 deletions

View File

@ -17,6 +17,7 @@ from oslo.config import cfg
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import utils
from neutron import context as neutron_context
from neutron.extensions import l3
@ -30,8 +31,14 @@ from neutron.plugins.common import constants as plugin_constants
LOG = logging.getLogger(__name__)
class L3RpcCallbackMixin(object):
"""A mix-in that enable L3 agent rpc support in plugin implementations."""
class L3RpcCallback(n_rpc.RpcCallback):
"""L3 agent RPC callback in plugin implementations."""
# 1.0 L3PluginApi BASE_RPC_API_VERSION
# 1.1 Support update_floatingip_statuses
# 1.2 Added methods for DVR support
# 1.3 Added a method that returns the list of activated services
RPC_API_VERSION = '1.3'
@property
def plugin(self):

View File

@ -28,6 +28,7 @@ from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
@ -40,7 +41,6 @@ from neutron.db import dhcp_rpc_base
from neutron.db import external_net_db
from neutron.db import extraroute_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import portbindings
@ -79,7 +79,6 @@ cfg.CONF.register_opts(PHYSICAL_INTERFACE_OPTS, "PHYSICAL_INTERFACE")
class BridgeRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
"""Agent callback."""
@ -264,6 +263,7 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
is_admin=False)
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [BridgeRpcCallbacks(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)

View File

@ -23,6 +23,7 @@ from oslo.config import cfg as q_conf
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
@ -36,7 +37,6 @@ from neutron.db import dhcp_rpc_base
from neutron.db import external_net_db
from neutron.db import extraroute_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_db
from neutron.extensions import portbindings
from neutron.extensions import providernet
@ -59,8 +59,7 @@ LOG = logging.getLogger(__name__)
class N1kvRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin):
dhcp_rpc_base.DhcpRpcCallbackMixin):
"""Class to handle agent RPC calls."""
@ -128,7 +127,9 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.service_topics = {svc_constants.CORE: topics.PLUGIN,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [N1kvRpcCallbacks(), agents_db.AgentExtRpcCallback()]
self.endpoints = [N1kvRpcCallbacks(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()

View File

@ -16,6 +16,7 @@
from oslo.config import cfg
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.v2 import attributes
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
@ -188,6 +189,7 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
self.notifier = agent_notifier_api.AgentNotifierApi(
topics.AGENT)
self.endpoints = [rpc_callbacks.HyperVRpcCallbacks(self.notifier),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)

View File

@ -17,7 +17,6 @@
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.db import dhcp_rpc_base
from neutron.db import l3_rpc_base
from neutron.openstack.common import log as logging
from neutron.plugins.hyperv import db as hyperv_db
@ -27,8 +26,7 @@ LOG = logging.getLogger(__name__)
class HyperVRpcCallbacks(
n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin):
dhcp_rpc_base.DhcpRpcCallbackMixin):
# history
# 1.1 Support Security Group RPC

View File

@ -20,6 +20,7 @@ from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
@ -35,7 +36,6 @@ from neutron.db import external_net_db
from neutron.db import extraroute_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_db
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
@ -55,7 +55,6 @@ LOG = logging.getLogger(__name__)
class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin
):
@ -285,6 +284,7 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [LinuxBridgeRpcCallbacks(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)

View File

@ -20,6 +20,7 @@ from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
@ -120,6 +121,7 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = n_rpc.create_connection(new=True)
self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)

View File

@ -18,7 +18,6 @@ from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.db import api as db_api
from neutron.db import dhcp_rpc_base
from neutron.db import l3_rpc_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.openstack.common import log as logging
from neutron.plugins.mlnx.db import mlnx_db_v2 as db
@ -28,7 +27,6 @@ LOG = logging.getLogger(__name__)
class MlnxRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
# History
# 1.1 Support Security Group RPC

View File

@ -17,6 +17,7 @@
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api import extensions as neutron_extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.v2 import attributes as attrs
from neutron.common import constants as const
from neutron.common import exceptions as n_exc
@ -28,7 +29,6 @@ from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
from neutron.db import external_net_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_base
from neutron.db import portbindings_db
from neutron.db import quota_db # noqa
@ -147,7 +147,7 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.endpoints = [
NECPluginV2RPCCallbacks(self.safe_reference),
DhcpRpcCallback(),
L3RpcCallback(),
l3_rpc.L3RpcCallback(),
self.callback_sg,
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
@ -686,12 +686,6 @@ class DhcpRpcCallback(n_rpc.RpcCallback,
RPC_API_VERSION = '1.1'
class L3RpcCallback(n_rpc.RpcCallback, l3_rpc_base.L3RpcCallbackMixin):
# 1.0 L3PluginApi BASE_RPC_API_VERSION
# 1.1 Support update_floatingip_statuses
RPC_API_VERSION = '1.1'
class SecurityGroupServerRpcCallback(
n_rpc.RpcCallback,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):

View File

@ -21,6 +21,7 @@ from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.common import constants as q_const
from neutron.common import exceptions as nexception
from neutron.common import rpc as n_rpc
@ -33,7 +34,6 @@ from neutron.db import external_net_db
from neutron.db import extraroute_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_base
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
@ -53,7 +53,6 @@ IPv6 = 6
class NVSDPluginRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
RPC_API_VERSION = '1.1'
@ -162,6 +161,7 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
l3_rpc_agent_api.L3AgentNotifyAPI()
)
self.endpoints = [NVSDPluginRpcCallbacks(),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)

View File

@ -20,6 +20,7 @@ from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
@ -36,7 +37,6 @@ from neutron.db import extradhcpopt_db
from neutron.db import extraroute_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_db
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
@ -60,7 +60,6 @@ LOG = logging.getLogger(__name__)
class OVSRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
# history
@ -346,6 +345,7 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
l3_rpc_agent_api.L3AgentNotifyAPI()
)
self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type),
l3_rpc.L3RpcCallback(),
agents_db.AgentExtRpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)

View File

@ -20,6 +20,7 @@ from ryu.app import client
from ryu.app import rest_nw_id
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import l3_rpc
from neutron.common import constants as q_const
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
@ -30,7 +31,6 @@ from neutron.db import dhcp_rpc_base
from neutron.db import external_net_db
from neutron.db import extraroute_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import models_v2
from neutron.db import portbindings_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
@ -47,7 +47,6 @@ LOG = logging.getLogger(__name__)
class RyuRpcCallbacks(n_rpc.RpcCallback,
dhcp_rpc_base.DhcpRpcCallbackMixin,
l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin):
RPC_API_VERSION = '1.1'
@ -139,7 +138,8 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = n_rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.endpoints = [RyuRpcCallbacks(self.ofp_api_host)]
self.endpoints = [RyuRpcCallbacks(self.ofp_api_host),
l3_rpc.L3RpcCallback()]
for svc_topic in self.service_topics.values():
self.conn.create_consumer(svc_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()

View File

@ -21,6 +21,7 @@ import threading
from oslo.config import cfg
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.common import constants as q_const
from neutron.common import log
from neutron.common import rpc as q_rpc
@ -30,7 +31,6 @@ from neutron.db import db_base_plugin_v2
from neutron.db import extraroute_db
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
@ -41,12 +41,6 @@ from neutron.plugins.ml2.drivers.arista.arista_l3_driver import NeutronNets # n
LOG = logging.getLogger(__name__)
class AristaL3ServicePluginRpcCallbacks(q_rpc.RpcCallback,
l3_rpc_base.L3RpcCallbackMixin):
RPC_API_VERSION = '1.2'
class AristaL3ServicePlugin(db_base_plugin_v2.NeutronDbPluginV2,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
@ -76,7 +70,7 @@ class AristaL3ServicePlugin(db_base_plugin_v2.NeutronDbPluginV2,
self.conn = q_rpc.create_connection(new=True)
self.agent_notifiers.update(
{q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
self.endpoints = [AristaL3ServicePluginRpcCallbacks()]
self.endpoints = [l3_rpc.L3RpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
self.conn.consume_in_threads()

View File

@ -18,6 +18,7 @@
from oslo.config import cfg
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.common import constants as q_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
@ -26,20 +27,10 @@ from neutron.db import extraroute_db
from neutron.db import l3_dvr_db
from neutron.db import l3_dvrscheduler_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.openstack.common import importutils
from neutron.plugins.common import constants
class L3RouterPluginRpcCallbacks(n_rpc.RpcCallback,
l3_rpc_base.L3RpcCallbackMixin):
RPC_API_VERSION = '1.3'
# history
# 1.2 Added methods for DVR support
# 1.3 Added a method that returns the list of activated services
class L3RouterPlugin(common_db_mixin.CommonDbMixin,
extraroute_db.ExtraRoute_db_mixin,
l3_dvr_db.L3_NAT_with_dvr_db_mixin,
@ -70,7 +61,7 @@ class L3RouterPlugin(common_db_mixin.CommonDbMixin,
self.conn = n_rpc.create_connection(new=True)
self.agent_notifiers.update(
{q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
self.endpoints = [L3RouterPluginRpcCallbacks()]
self.endpoints = [l3_rpc.L3RpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
self.conn.consume_in_threads()

View File

@ -15,8 +15,8 @@
import contextlib
from neutron.api.rpc.handlers import l3_rpc
from neutron.common import constants
from neutron.db import l3_rpc_base
from neutron.tests.unit.nec import test_nec_plugin
from neutron.tests.unit.openvswitch import test_agent_scheduler
@ -75,10 +75,10 @@ class NecL3AgentSchedulerWithOpenFlowRouter(
self.router(arg_list=('provider',),
provider='openflow'
)) as (r1, r2):
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
ret_b = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
ret_b = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
l3_agents = self._list_l3_agents_hosting_router(
r1['router']['id'])
self.assertEqual(1, len(ret_a))
@ -93,9 +93,9 @@ class NecL3AgentSchedulerWithOpenFlowRouter(
self.router(arg_list=('provider',), provider='openflow'),
self.router(arg_list=('provider',), provider='openflow')
) as (r1, r2):
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
l3_agents_1 = self._list_l3_agents_hosting_router(
r1['router']['id'])
l3_agents_2 = self._list_l3_agents_hosting_router(

View File

@ -23,13 +23,13 @@ from webob import exc
from neutron.api import extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron import context
from neutron.db import agents_db
from neutron.db import dhcp_rpc_base
from neutron.db import l3_agentschedulers_db
from neutron.db import l3_rpc_base
from neutron.extensions import agent
from neutron.extensions import dhcpagentscheduler
from neutron.extensions import l3agentscheduler
@ -645,11 +645,11 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
def test_router_is_not_rescheduled_from_alive_agent(self):
with self.router():
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
# schedule the router to host A
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
with mock.patch('neutron.db.l3_agentschedulers_db.'
'L3AgentSchedulerDbMixin.reschedule_router') as rr:
# take down some unrelated agent and run reschedule check
@ -658,24 +658,24 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
def test_router_reschedule_from_dead_agent(self):
with self.router():
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
# schedule the router to host A
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
self._take_down_agent_and_run_reschedule(L3_HOSTA)
# B should now pick up the router
ret_b = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
ret_b = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
self.assertEqual(ret_b, ret_a)
def test_router_no_reschedule_from_dead_admin_down_agent(self):
with self.router() as r:
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
# schedule the router to host A
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
self._set_agent_admin_state_up(L3_HOSTA, False)
self._take_down_agent_and_run_reschedule(L3_HOSTA)
@ -687,28 +687,28 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
self.assertEqual(binding.l3_agent.host, L3_HOSTA)
# B should not pick up the router
ret_b = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
ret_b = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
self.assertFalse(ret_b)
def test_router_auto_schedule_with_invalid_router(self):
with self.router() as router:
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
# deleted router
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=[router['router']['id']])
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=[router['router']['id']])
self.assertFalse(ret_a)
# non-existent router
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=[uuidutils.generate_uuid()])
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=[uuidutils.generate_uuid()])
self.assertFalse(ret_a)
def test_router_auto_schedule_with_hosted(self):
with self.router() as router:
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
ret_b = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
ret_b = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
l3_agents = self._list_l3_agents_hosting_router(
router['router']['id'])
self.assertEqual(1, len(ret_a))
@ -719,14 +719,14 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
def test_router_auto_schedule_restart_l3_agent(self):
with self.router():
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
def test_router_auto_schedule_with_hosted_2(self):
# one agent hosts one router
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
l3_hosta = {
'binary': 'neutron-l3-agent',
'host': L3_HOSTA,
@ -744,13 +744,13 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
l3_hostb['host'] = L3_HOSTB
with self.router() as router1:
self._register_one_agent_state(l3_hosta)
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
self._disable_agent(hosta_id, admin_state_up=False)
with self.router() as router2:
self._register_one_agent_state(l3_hostb)
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
l3_agents_1 = self._list_l3_agents_hosting_router(
router1['router']['id'])
l3_agents_2 = self._list_l3_agents_hosting_router(
@ -773,7 +773,7 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
def test_router_auto_schedule_with_disabled(self):
with contextlib.nested(self.router(),
self.router()):
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
@ -781,9 +781,9 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
L3_HOSTB)
self._disable_agent(hosta_id)
# first agent will not host router since it is disabled
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
# second agent will host all the routers since first is disabled.
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTB)
hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id)
num_hostb_routers = len(hostb_routers['routers'])
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
@ -807,12 +807,12 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
'agent_type': constants.AGENT_TYPE_L3}
with contextlib.nested(self.router(),
self.router()) as (router1, router2):
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
l3_hosta['configurations']['router_id'] = router1['router']['id']
self._register_one_agent_state(l3_hosta)
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
num_hosta_routers = len(hosta_routers['routers'])
l3_agents_1 = self._list_l3_agents_hosting_router(
@ -825,11 +825,11 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
self.assertEqual(0, len(l3_agents_2['agents']))
def test_rpc_sync_routers(self):
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
# No routers
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
self.assertEqual(0, len(ret_a))
with contextlib.nested(self.router(),
@ -838,26 +838,26 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
router_ids = [r['router']['id'] for r in routers]
# Get all routers
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA)
self.assertEqual(3, len(ret_a))
self.assertEqual(set(router_ids), set([r['id'] for r in ret_a]))
# Get all routers (router_ids=None)
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=None)
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=None)
self.assertEqual(3, len(ret_a))
self.assertEqual(set(router_ids), set([r['id'] for r in ret_a]))
# Get router2 only
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=[router_ids[1]])
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=[router_ids[1]])
self.assertEqual(1, len(ret_a))
self.assertIn(router_ids[1], [r['id'] for r in ret_a])
# Get router1 and router3
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=[router_ids[0],
router_ids[2]])
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=[router_ids[0],
router_ids[2]])
self.assertEqual(2, len(ret_a))
self.assertIn(router_ids[0], [r['id'] for r in ret_a])
self.assertIn(router_ids[2], [r['id'] for r in ret_a])
@ -865,8 +865,8 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
def test_router_auto_schedule_for_specified_routers(self):
def _sync_router_with_ids(router_ids, exp_synced, exp_hosted, host_id):
ret_a = l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=router_ids)
ret_a = l3_rpc_cb.sync_routers(self.adminContext, host=L3_HOSTA,
router_ids=router_ids)
self.assertEqual(exp_synced, len(ret_a))
for r in router_ids:
self.assertIn(r, [r['id'] for r in ret_a])
@ -874,7 +874,7 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
num_host_routers = len(host_routers['routers'])
self.assertEqual(exp_hosted, num_host_routers)
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3, L3_HOSTA)

View File

@ -22,6 +22,7 @@ import netaddr
from oslo.config import cfg
from webob import exc
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.v2 import attributes
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
@ -33,7 +34,6 @@ from neutron.db import l3_agentschedulers_db
from neutron.db import l3_attrs_db
from neutron.db import l3_db
from neutron.db import l3_dvr_db
from neutron.db import l3_rpc_base
from neutron.extensions import external_net
from neutron.extensions import l3
from neutron.extensions import portbindings
@ -1992,15 +1992,15 @@ class L3NatDBIntAgentSchedulingTestCase(L3BaseForIntTests,
self.subnet(),
self.subnet()) as (r, s1, s2):
self._set_net_external(s1['subnet']['network_id'])
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_one_l3_agent(
host='host1',
ext_net_id=s1['subnet']['network_id'])
self._register_one_l3_agent(
host='host2', internal_only=False,
ext_net_id=s2['subnet']['network_id'])
l3_rpc.sync_routers(self.adminContext,
host='host1')
l3_rpc_cb.sync_routers(self.adminContext,
host='host1')
self._assert_router_on_agent(r['router']['id'], 'host1')
self._add_external_gateway_to_router(
@ -2023,15 +2023,15 @@ class L3NatDBIntAgentSchedulingTestCase(L3BaseForIntTests,
self.subnet(),
self.subnet()) as (r, s1, s2):
self._set_net_external(s1['subnet']['network_id'])
l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_one_l3_agent(
host='host1',
ext_net_id=s1['subnet']['network_id'])
self._register_one_l3_agent(
host='host2', internal_only=False,
ext_net_id='', ext_bridge='')
l3_rpc.sync_routers(self.adminContext,
host='host1')
l3_rpc_cb.sync_routers(self.adminContext,
host='host1')
self._assert_router_on_agent(r['router']['id'], 'host1')
self._add_external_gateway_to_router(
@ -2061,17 +2061,17 @@ class L3NatDBIntAgentSchedulingTestCase(L3BaseForIntTests,
expected_code=exc.HTTPBadRequest.code)
class L3RpcCallbackMixinTestCase(base.BaseTestCase):
class L3RpcCallbackTestCase(base.BaseTestCase):
def setUp(self):
super(L3RpcCallbackMixinTestCase, self).setUp()
super(L3RpcCallbackTestCase, self).setUp()
self.mock_plugin = mock.patch.object(
l3_rpc_base.L3RpcCallbackMixin,
l3_rpc.L3RpcCallback,
'plugin', new_callable=mock.PropertyMock).start()
self.mock_l3plugin = mock.patch.object(
l3_rpc_base.L3RpcCallbackMixin,
l3_rpc.L3RpcCallback,
'l3plugin', new_callable=mock.PropertyMock).start()
self.mixin = l3_rpc_base.L3RpcCallbackMixin()
self.l3_rpc_cb = l3_rpc.L3RpcCallback()
def test__ensure_host_set_on_port_update_on_concurrent_delete(self):
port_id = 'foo_port_id'
@ -2082,12 +2082,12 @@ class L3RpcCallbackMixinTestCase(base.BaseTestCase):
portbindings.VIF_TYPE: portbindings.VIF_TYPE_BINDING_FAILED
}
router_id = 'foo_router_id'
self.mixin.plugin.update_port.side_effect = n_exc.PortNotFound(
self.l3_rpc_cb.plugin.update_port.side_effect = n_exc.PortNotFound(
port_id=port_id)
with mock.patch.object(l3_rpc_base.LOG, 'debug') as mock_log:
self.mixin._ensure_host_set_on_port(
with mock.patch.object(l3_rpc.LOG, 'debug') as mock_log:
self.l3_rpc_cb._ensure_host_set_on_port(
mock.ANY, mock.ANY, port, router_id)
self.mixin.plugin.update_port.assert_called_once_with(
self.l3_rpc_cb.plugin.update_port.assert_called_once_with(
mock.ANY, port_id, {'port': {'binding:host_id': mock.ANY}})
self.assertTrue(mock_log.call_count)
expected_message = ('Port foo_port_id not found while updating '