Add agent scheduling for LBaaS namespace agent

- adds simple chance scheduling on create pool operation
- adds PoolsLoadbalancerAgentBinding db table
- adds lbaas_agentscheduler extension to list pools hosted by a particular agent
  and to get an agent hosting a particular pool
- adds agent notifiers mapping to AgentSchedulerDbMixin to make it easier
  for services to add their agent notifiers to the core plugin

Implements blueprint lbaas-agent-scheduler
Change-Id: Id98649fd5c7873dcd5be1a2b117b8bed25f06cc2
This commit is contained in:
Oleg Bondarev 2013-05-29 11:58:17 +04:00
parent f341ad5703
commit da65fe6951
27 changed files with 945 additions and 152 deletions

View File

@ -220,6 +220,8 @@ notification_topics = notifications
# network_scheduler_driver = neutron.scheduler.dhcp_agent_scheduler.ChanceScheduler
# Driver to use for scheduling router to a default L3 agent
# router_scheduler_driver = neutron.scheduler.l3_agent_scheduler.ChanceScheduler
# Driver to use for scheduling a loadbalancer pool to an lbaas agent
# loadbalancer_pool_scheduler_driver = neutron.services.loadbalancer.agent_scheduler.ChanceScheduler
# Allow auto scheduling networks to DHCP agent. It will schedule non-hosted
# networks to first DHCP agent which sends get_active_networks message to

View File

@ -79,6 +79,8 @@
"get_l3-routers": "rule:admin_only",
"get_dhcp-agents": "rule:admin_only",
"get_l3-agents": "rule:admin_only",
"get_loadbalancer-agent": "rule:admin_only",
"get_loadbalancer-pools": "rule:admin_only",
"create_router": "rule:regular_user",
"get_router": "rule:admin_or_owner",

View File

@ -67,6 +67,7 @@ AGENT_TYPE_OVS = 'Open vSwitch agent'
AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
AGENT_TYPE_NEC = 'NEC plugin agent'
AGENT_TYPE_L3 = 'L3 agent'
AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent'
L2_AGENT_TOPIC = 'N/A'
PAGINATION_INFINITE = 'infinite'
@ -76,3 +77,4 @@ SORT_DIRECTION_DESC = 'desc'
L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler'
DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler'
LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler'

View File

@ -162,6 +162,9 @@ class AgentExtRpcCallback(object):
RPC_API_VERSION = '1.0'
START_TIME = timeutils.utcnow()
def __init__(self, plugin=None):
self.plugin = plugin
def report_state(self, context, **kwargs):
"""Report state from agent to server."""
time = kwargs['time']
@ -170,5 +173,6 @@ class AgentExtRpcCallback(object):
LOG.debug(_("Message with invalid timestamp received"))
return
agent_state = kwargs['agent_state']['agent_state']
plugin = manager.NeutronManager.get_plugin()
plugin.create_or_update_agent(context, agent_state)
if not self.plugin:
self.plugin = manager.NeutronManager.get_plugin()
self.plugin.create_or_update_agent(context, agent_state)

View File

@ -79,8 +79,13 @@ class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId):
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
"""Common class for agent scheduler mixins."""
dhcp_agent_notifier = None
l3_agent_notifier = None
# agent notifiers to handle agent update operations;
# should be updated by plugins;
agent_notifiers = {
constants.AGENT_TYPE_DHCP: None,
constants.AGENT_TYPE_L3: None,
constants.AGENT_TYPE_LOADBALANCER: None,
}
@staticmethod
def is_eligible_agent(active, agent):
@ -100,17 +105,12 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
result = super(AgentSchedulerDbMixin, self).update_agent(
context, id, agent)
agent_data = agent['agent']
if ('admin_state_up' in agent_data and
agent_notifier = self.agent_notifiers.get(original_agent['agent_type'])
if (agent_notifier and
'admin_state_up' in agent_data and
original_agent['admin_state_up'] != agent_data['admin_state_up']):
if (original_agent['agent_type'] == constants.AGENT_TYPE_DHCP and
self.dhcp_agent_notifier):
self.dhcp_agent_notifier.agent_updated(
context, agent_data['admin_state_up'],
original_agent['host'])
elif (original_agent['agent_type'] == constants.AGENT_TYPE_L3 and
self.l3_agent_notifier):
self.l3_agent_notifier.agent_updated(
context, agent_data['admin_state_up'],
agent_notifier.agent_updated(context,
agent_data['admin_state_up'],
original_agent['host'])
return result
@ -148,8 +148,9 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
raise l3agentscheduler.RouterSchedulingFailed(
router_id=router_id, agent_id=id)
if self.l3_agent_notifier:
self.l3_agent_notifier.router_added_to_agent(
l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
if l3_notifier:
l3_notifier.router_added_to_agent(
context, [router_id], agent_db.host)
def remove_router_from_l3_agent(self, context, id, router_id):
@ -170,8 +171,9 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
raise l3agentscheduler.RouterNotHostedByL3Agent(
router_id=router_id, agent_id=id)
context.session.delete(binding)
if self.l3_agent_notifier:
self.l3_agent_notifier.router_removed_from_agent(
l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
if l3_notifier:
l3_notifier.router_removed_from_agent(
context, router_id, agent.host)
def list_routers_on_l3_agent(self, context, id):
@ -356,8 +358,9 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
binding.dhcp_agent_id = id
binding.network_id = network_id
context.session.add(binding)
if self.dhcp_agent_notifier:
self.dhcp_agent_notifier.network_added_to_agent(
dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
if dhcp_notifier:
dhcp_notifier.network_added_to_agent(
context, network_id, agent_db.host)
def remove_network_from_dhcp_agent(self, context, id, network_id):
@ -372,8 +375,9 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
raise dhcpagentscheduler.NetworkNotHostedByDhcpAgent(
network_id=network_id, agent_id=id)
context.session.delete(binding)
if self.dhcp_agent_notifier:
self.dhcp_agent_notifier.network_removed_from_agent(
dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
if dhcp_notifier:
dhcp_notifier.network_removed_from_agent(
context, network_id, agent.host)
def list_networks_on_dhcp_agent(self, context, id):

View File

@ -0,0 +1,53 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""LBaaS Pool scheduler
Revision ID: 52c5e4a18807
Revises: 2032abe8edac
Create Date: 2013-06-14 03:23:47.815865
"""
# revision identifiers, used by Alembic.
revision = '52c5e4a18807'
down_revision = '2032abe8edac'
from alembic import op
import sqlalchemy as sa
def upgrade(active_plugin=None, options=None):
### commands auto generated by Alembic - please adjust! ###
op.create_table(
'poolloadbalanceragentbindings',
sa.Column('pool_id', sa.String(length=36), nullable=False),
sa.Column('loadbalancer_agent_id', sa.String(length=36),
nullable=False),
sa.ForeignKeyConstraint(['loadbalancer_agent_id'], ['agents.id'],
ondelete='CASCADE'),
sa.ForeignKeyConstraint(['pool_id'], ['pools.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('pool_id')
)
### end Alembic commands ###
def downgrade(active_plugin=None, options=None):
### commands auto generated by Alembic - please adjust! ###
op.drop_table('poolloadbalanceragentbindings')
### end Alembic commands ###

View File

@ -0,0 +1,138 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack Foundation.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from abc import abstractmethod
from neutron.api import extensions
from neutron.api.v2 import base
from neutron.api.v2 import resource
from neutron.common import constants
from neutron.extensions import agent
from neutron import manager
from neutron.plugins.common import constants as plugin_const
from neutron import policy
from neutron import wsgi
LOADBALANCER_POOL = 'loadbalancer-pool'
LOADBALANCER_POOLS = LOADBALANCER_POOL + 's'
LOADBALANCER_AGENT = 'loadbalancer-agent'
class PoolSchedulerController(wsgi.Controller):
def index(self, request, **kwargs):
lbaas_plugin = manager.NeutronManager.get_service_plugins().get(
plugin_const.LOADBALANCER)
if not lbaas_plugin:
return {'pools': []}
policy.enforce(request.context,
"get_%s" % LOADBALANCER_POOLS,
{},
plugin=lbaas_plugin)
return lbaas_plugin.list_pools_on_lbaas_agent(
request.context, kwargs['agent_id'])
class LbaasAgentHostingPoolController(wsgi.Controller):
def index(self, request, **kwargs):
lbaas_plugin = manager.NeutronManager.get_service_plugins().get(
plugin_const.LOADBALANCER)
if not lbaas_plugin:
return
policy.enforce(request.context,
"get_%s" % LOADBALANCER_AGENT,
{},
plugin=lbaas_plugin)
return lbaas_plugin.get_lbaas_agent_hosting_pool(
request.context, kwargs['pool_id'])
class Lbaas_agentscheduler(extensions.ExtensionDescriptor):
"""Extension class supporting l3 agent scheduler.
"""
@classmethod
def get_name(cls):
return "Loadbalancer Agent Scheduler"
@classmethod
def get_alias(cls):
return constants.LBAAS_AGENT_SCHEDULER_EXT_ALIAS
@classmethod
def get_description(cls):
return "Schedule pools among lbaas agents"
@classmethod
def get_namespace(cls):
return "http://docs.openstack.org/ext/lbaas_agent_scheduler/api/v1.0"
@classmethod
def get_updated(cls):
return "2013-02-07T10:00:00-00:00"
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
exts = []
parent = dict(member_name="agent",
collection_name="agents")
controller = resource.Resource(PoolSchedulerController(),
base.FAULT_MAP)
exts.append(extensions.ResourceExtension(
LOADBALANCER_POOLS, controller, parent))
parent = dict(member_name="pool",
collection_name="pools")
controller = resource.Resource(LbaasAgentHostingPoolController(),
base.FAULT_MAP)
exts.append(extensions.ResourceExtension(
LOADBALANCER_AGENT, controller, parent,
path_prefix=plugin_const.
COMMON_PREFIXES[plugin_const.LOADBALANCER]))
return exts
def get_extended_resources(self, version):
return {}
class NoEligibleLbaasAgent(agent.AgentNotFound):
message = _("No eligible loadbalancer agent found "
"for pool %(pool_id)s.")
class NoActiveLbaasAgent(agent.AgentNotFound):
message = _("No active loadbalancer agent found "
"for pool %(pool_id)s.")
class LbaasAgentSchedulerPluginBase(object):
"""REST API to operate the lbaas agent scheduler.
All of method must be in an admin context.
"""
@abstractmethod
def list_pools_on_lbaas_agent(self, context, id):
pass
@abstractmethod
def get_lbaas_agent_hosting_pool(self, context, pool_id):
pass

View File

@ -177,6 +177,12 @@ class NeutronManager(object):
self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst
# search for possible agent notifiers declared in service plugin
# (needed by agent management extension)
if (hasattr(self.plugin, 'agent_notifiers') and
hasattr(plugin_inst, 'agent_notifiers')):
self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)
LOG.debug(_("Successfully loaded %(type)s plugin. "
"Description: %(desc)s"),
{"type": plugin_inst.get_plugin_type(),

View File

@ -30,6 +30,7 @@ from oslo.config import cfg
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
from neutron.common import rpc as q_rpc
from neutron.common import topics
from neutron.common import utils
@ -254,8 +255,12 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.notifier = AgentNotifierApi(topics.AGENT)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotify
)
def create_network(self, context, network):
"""Create network.

View File

@ -268,8 +268,12 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.notifier = AgentNotifierApi(topics.AGENT)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotify
)
def _parse_network_vlan_ranges(self):
try:

View File

@ -107,8 +107,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _setup_rpc(self):
self.notifier = rpc.AgentNotifierApi(topics.AGENT)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
self.agent_notifiers[const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotify
)
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
self.topic = topics.PLUGIN
self.conn = c_rpc.create_connection(new=True)

View File

@ -19,6 +19,7 @@
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.common import constants as q_const
from neutron.common import exceptions as q_exc
from neutron.common import rpc as q_rpc
from neutron.common import topics
@ -119,8 +120,12 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotify
)
# NOTE: callback_sg is referred to from the sg unit test.
self.callback_sg = SecurityGroupServerRpcCallback()

View File

@ -819,7 +819,8 @@ class NvpPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.dispatcher = NVPRpcCallbacks().create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.agent_notifiers[constants.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
# Consume from all consumers in a thread
self.conn.consume_in_thread()

View File

@ -309,8 +309,12 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotify
)
self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,

View File

@ -0,0 +1,114 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import joinedload
from neutron.common import constants
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import model_base
from neutron.extensions import lbaas_agentscheduler
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class PoolLoadbalancerAgentBinding(model_base.BASEV2):
"""Represents binding between neutron loadbalancer pools and agents."""
pool_id = sa.Column(sa.String(36),
sa.ForeignKey("pools.id", ondelete='CASCADE'),
primary_key=True)
agent = orm.relation(agents_db.Agent)
agent_id = sa.Column(sa.String(36), sa.ForeignKey("agents.id",
ondelete='CASCADE'))
class LbaasAgentSchedulerDbMixin(agentschedulers_db.AgentSchedulerDbMixin,
lbaas_agentscheduler
.LbaasAgentSchedulerPluginBase):
def get_lbaas_agent_hosting_pool(self, context, pool_id, active=None):
query = context.session.query(PoolLoadbalancerAgentBinding)
query = query.options(joinedload('agent'))
binding = query.get(pool_id)
if (binding and self.is_eligible_agent(
active, binding.agent)):
return {'agent': self._make_agent_dict(binding.agent)}
def get_lbaas_agents(self, context, active=None, filters=None):
query = context.session.query(agents_db.Agent)
query = query.filter_by(agent_type=constants.AGENT_TYPE_LOADBALANCER)
if active is not None:
query = query.filter_by(admin_state_up=active)
if filters:
for key, value in filters.iteritems():
column = getattr(agents_db.Agent, key, None)
if column:
query = query.filter(column.in_(value))
return [agent
for agent in query
if self.is_eligible_agent(active, agent)]
def list_pools_on_lbaas_agent(self, context, id):
query = context.session.query(PoolLoadbalancerAgentBinding.pool_id)
query = query.filter_by(agent_id=id)
pool_ids = [item[0] for item in query]
if pool_ids:
return {'pools': self.get_pools(context, filters={'id': pool_ids})}
else:
return {'pools': []}
class ChanceScheduler(object):
"""Allocate a loadbalancer agent for a vip in a random way."""
def schedule(self, plugin, context, pool):
"""Schedule the pool to an active loadbalancer agent if there
is no enabled agent hosting it.
"""
with context.session.begin(subtransactions=True):
lbaas_agent = plugin.get_lbaas_agent_hosting_pool(
context, pool['id'])
if lbaas_agent:
LOG.debug(_('Pool %(pool_id)s has already been hosted'
' by lbaas agent %(agent_id)s'),
{'pool_id': pool['id'],
'agent_id': lbaas_agent['id']})
return
candidates = plugin.get_lbaas_agents(context, active=True)
if not candidates:
LOG.warn(_('No active lbaas agents for pool %s') % pool['id'])
return
chosen_agent = random.choice(candidates)
binding = PoolLoadbalancerAgentBinding()
binding.agent = chosen_agent
binding.pool_id = pool['id']
context.session.add(binding)
LOG.debug(_('Pool %(pool_id)s is scheduled to '
'lbaas agent %(agent_id)s'),
{'pool_id': pool['id'],
'agent_id': chosen_agent['id']})
return chosen_agent

View File

@ -55,6 +55,7 @@ def main():
cfg.CONF.register_opts(manager.OPTS)
# import interface options just in case the driver uses namespaces
cfg.CONF.register_opts(interface.OPTS)
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
cfg.CONF(project='neutron')

View File

@ -21,9 +21,12 @@ import weakref
from oslo.config import cfg
from neutron.agent.common import config
from neutron.agent import rpc as agent_rpc
from neutron.common import constants
from neutron import context
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
from neutron.services.loadbalancer.drivers.haproxy import (
agent_api,
@ -110,6 +113,12 @@ class LogicalDeviceCache(object):
class LbaasAgentManager(periodic_task.PeriodicTasks):
# history
# 1.0 Initial version
# 1.1 Support agent_updated call
RPC_API_VERSION = '1.1'
def __init__(self, conf):
self.conf = conf
try:
@ -131,15 +140,46 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
except ImportError:
msg = _('Error importing loadbalancer device driver: %s')
raise SystemExit(msg % conf.device_driver)
ctx = context.get_admin_context_without_session()
self.plugin_rpc = agent_api.LbaasAgentApi(
plugin_driver.TOPIC_PROCESS_ON_HOST,
ctx,
conf.host
)
self.agent_state = {
'binary': 'neutron-loadbalancer-agent',
'host': conf.host,
'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
'configurations': {'device_driver': conf.device_driver,
'interface_driver': conf.interface_driver},
'agent_type': constants.AGENT_TYPE_LOADBALANCER,
'start_flag': True}
self.admin_state_up = True
self.context = context.get_admin_context_without_session()
self._setup_rpc()
self.needs_resync = False
self.cache = LogicalDeviceCache()
def _setup_rpc(self):
self.plugin_rpc = agent_api.LbaasAgentApi(
plugin_driver.TOPIC_PROCESS_ON_HOST,
self.context,
self.conf.host
)
self.state_rpc = agent_rpc.PluginReportStateAPI(
plugin_driver.TOPIC_PROCESS_ON_HOST)
report_interval = self.conf.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
def _report_state(self):
try:
device_count = len(self.cache.devices)
self.agent_state['configurations']['devices'] = device_count
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)
except Exception:
LOG.exception("Failed reporting state!")
def initialize_service_hook(self, started_by):
self.sync_state()
@ -228,3 +268,14 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
"""Handle RPC cast from plugin to destroy a pool if known to agent."""
if self.cache.get_by_pool_id(pool_id):
self.destroy_device(pool_id)
def agent_updated(self, context, payload):
"""Handle the agent_updated notification event."""
if payload['admin_state_up'] != self.admin_state_up:
self.admin_state_up = payload['admin_state_up']
if self.admin_state_up:
self.needs_resync = True
else:
for pool_id in self.cache.get_pool_ids():
self.destroy_device(pool_id)
LOG.info(_("agent_updated by server side %s!"), payload)

View File

@ -20,9 +20,13 @@ import uuid
from oslo.config import cfg
from neutron.common import constants as q_const
from neutron.common import exceptions as q_exc
from neutron.common import rpc as q_rpc
from neutron.db import agents_db
from neutron.db.loadbalancer import loadbalancer_db
from neutron.extensions import lbaas_agentscheduler
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import proxy
@ -37,19 +41,31 @@ ACTIVE_PENDING = (
constants.PENDING_UPDATE
)
AGENT_SCHEDULER_OPTS = [
cfg.StrOpt('loadbalancer_pool_scheduler_driver',
default='neutron.services.loadbalancer.agent_scheduler'
'.ChanceScheduler',
help=_('Driver to use for scheduling '
'pool to a default loadbalancer agent')),
]
cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
# topic name for this particular agent implementation
TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host'
TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent'
class LoadBalancerCallbacks(object):
RPC_API_VERSION = '1.0'
def __init__(self, plugin):
self.plugin = plugin
def create_rpc_dispatcher(self):
return q_rpc.PluginRpcDispatcher([self])
return q_rpc.PluginRpcDispatcher(
[self, agents_db.AgentExtRpcCallback(self.plugin)])
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
@ -61,6 +77,17 @@ class LoadBalancerCallbacks(object):
up = True # makes pep8 and sqlalchemy happy
qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
agents = self.plugin.get_lbaas_agents(context,
filters={'host': [host]})
if not agents:
return []
elif len(agents) > 1:
LOG.warning(_('Multiple lbaas agents found on host %s') % host)
pools = self.plugin.list_pools_on_lbaas_agent(context,
agents[0].id)
pool_ids = [pool['id'] for pool in pools['pools']]
qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
return [id for id, in qry]
def get_logical_device(self, context, pool_id=None, activate=True,
@ -185,40 +212,50 @@ class LoadBalancerCallbacks(object):
class LoadBalancerAgentApi(proxy.RpcProxy):
"""Plugin side of plugin to agent RPC API."""
API_VERSION = '1.0'
BASE_RPC_API_VERSION = '1.0'
# history
# 1.0 Initial version
# 1.1 Support agent_updated call
def __init__(self, topic, host):
super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
self.host = host
def __init__(self, topic):
super(LoadBalancerAgentApi, self).__init__(
topic, default_version=self.BASE_RPC_API_VERSION)
def reload_pool(self, context, pool_id):
def reload_pool(self, context, pool_id, host):
return self.cast(
context,
self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
topic=self.topic
self.make_msg('reload_pool', pool_id=pool_id, host=host),
topic='%s.%s' % (self.topic, host)
)
def destroy_pool(self, context, pool_id):
def destroy_pool(self, context, pool_id, host):
return self.cast(
context,
self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
topic=self.topic
self.make_msg('destroy_pool', pool_id=pool_id, host=host),
topic='%s.%s' % (self.topic, host)
)
def modify_pool(self, context, pool_id):
def modify_pool(self, context, pool_id, host):
return self.cast(
context,
self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
topic=self.topic
self.make_msg('modify_pool', pool_id=pool_id, host=host),
topic='%s.%s' % (self.topic, host)
)
def agent_updated(self, context, admin_state_up, host):
return self.cast(
context,
self.make_msg('agent_updated',
payload={'admin_state_up': admin_state_up}),
topic='%s.%s' % (self.topic, host),
version='1.1'
)
class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
def __init__(self, plugin):
self.agent_rpc = LoadBalancerAgentApi(
TOPIC_LOADBALANCER_AGENT,
cfg.CONF.host
)
self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
self.callbacks = LoadBalancerCallbacks(plugin)
self.conn = rpc.create_connection(new=True)
@ -228,56 +265,85 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
fanout=False)
self.conn.consume_in_thread()
self.plugin = plugin
self.plugin.agent_notifiers.update(
{q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
self.pool_scheduler = importutils.import_object(
cfg.CONF.loadbalancer_pool_scheduler_driver)
def get_pool_agent(self, context, pool_id):
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
if not agent:
raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
return agent['agent']
def create_vip(self, context, vip):
self.agent_rpc.reload_pool(context, vip['pool_id'])
agent = self.get_pool_agent(context, vip['pool_id'])
self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
def update_vip(self, context, old_vip, vip):
agent = self.get_pool_agent(context, vip['pool_id'])
if vip['status'] in ACTIVE_PENDING:
self.agent_rpc.reload_pool(context, vip['pool_id'])
self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
else:
self.agent_rpc.destroy_pool(context, vip['pool_id'])
self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
def delete_vip(self, context, vip):
self.plugin._delete_db_vip(context, vip['id'])
self.agent_rpc.destroy_pool(context, vip['pool_id'])
agent = self.get_pool_agent(context, vip['pool_id'])
self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
def create_pool(self, context, pool):
if not self.pool_scheduler.schedule(self.plugin, context, pool):
raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
# don't notify here because a pool needs a vip to be useful
pass
def update_pool(self, context, old_pool, pool):
agent = self.get_pool_agent(context, pool['id'])
if pool['status'] in ACTIVE_PENDING:
if pool['vip_id'] is not None:
self.agent_rpc.reload_pool(context, pool['id'])
self.agent_rpc.reload_pool(context, pool['id'], agent['host'])
else:
self.agent_rpc.destroy_pool(context, pool['id'])
self.agent_rpc.destroy_pool(context, pool['id'], agent['host'])
def delete_pool(self, context, pool):
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
if agent:
self.agent_rpc.destroy_pool(context, pool['id'],
agent['agent']['host'])
self.plugin._delete_db_pool(context, pool['id'])
self.agent_rpc.destroy_pool(context, pool['id'])
def create_member(self, context, member):
self.agent_rpc.modify_pool(context, member['pool_id'])
agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
def update_member(self, context, old_member, member):
# member may change pool id
if member['pool_id'] != old_member['pool_id']:
self.agent_rpc.modify_pool(context, old_member['pool_id'])
self.agent_rpc.modify_pool(context, member['pool_id'])
agent = self.plugin.get_lbaas_agent_hosting_pool(
context, old_member['pool_id'])
if agent:
self.agent_rpc.modify_pool(context,
old_member['pool_id'],
agent['agent']['host'])
agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
def delete_member(self, context, member):
self.plugin._delete_db_member(context, member['id'])
self.agent_rpc.modify_pool(context, member['pool_id'])
agent = self.get_pool_agent(context, member['pool_id'])
self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
def update_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id):
# monitors are unused here because agent will fetch what is necessary
self.agent_rpc.modify_pool(context, pool_id)
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.modify_pool(context, pool_id, agent['host'])
def create_pool_health_monitor(self, context, healthmon, pool_id):
# healthmon is not used here
self.agent_rpc.modify_pool(context, pool_id)
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.modify_pool(context, pool_id, agent['host'])
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
self.plugin._delete_db_pool_health_monitor(
@ -285,7 +351,8 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
)
# healthmon_id is not used here
self.agent_rpc.modify_pool(context, pool_id)
agent = self.get_pool_agent(context, pool_id)
self.agent_rpc.modify_pool(context, pool_id, agent['host'])
def create_health_monitor(self, context, health_monitor):
pass

View File

@ -58,7 +58,7 @@ class NoopLbaaSDriver(abstract_driver.LoadBalancerAbstractDriver):
@log.log
def delete_pool(self, context, pool):
pass
self.plugin._delete_db_pool(context, pool["id"])
@log.log
def stats(self, context, pool_id):

View File

@ -23,6 +23,7 @@ from neutron.db.loadbalancer import loadbalancer_db
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
from neutron.services.loadbalancer import agent_scheduler
LOG = logging.getLogger(__name__)
@ -39,7 +40,8 @@ cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS")
legacy.override_config(cfg.CONF, [('LBAAS', 'driver_fqn')])
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb,
agent_scheduler.LbaasAgentSchedulerDbMixin):
"""Implementation of the Neutron Loadbalancer Service Plugin.
@ -47,7 +49,12 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
Most DB related works are implemented in class
loadbalancer_db.LoadBalancerPluginDb.
"""
supported_extension_aliases = ["lbaas"]
supported_extension_aliases = ["lbaas", "lbaas_agent_scheduler"]
# lbaas agent notifiers to handle agent update operations;
# can be updated by plugin drivers while loading;
# will be extracted by neutron manager when loading service plugins;
agent_notifiers = {}
def __init__(self):
"""Initialization for the loadbalancer service plugin."""
@ -213,7 +220,7 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
# update the db and return the value from db
# else - return what we have in db
if stats_data:
super(LoadBalancerPlugin, self)._update_pool_stats(
super(LoadBalancerPlugin, self).update_pool_stats(
context,
pool_id,
stats_data

View File

@ -16,14 +16,17 @@
import contextlib
import logging
import os
import testtools
import mock
from oslo.config import cfg
import testtools
import webob.exc
from neutron.api.extensions import ExtensionMiddleware
from neutron.api.extensions import PluginAwareExtensionManager
from neutron.common import config
from neutron import context
import neutron.db.l3_db # noqa
from neutron.db.loadbalancer import loadbalancer_db as ldb
import neutron.extensions
from neutron.extensions import loadbalancer
@ -46,34 +49,19 @@ ETCDIR = os.path.join(ROOTDIR, 'etc')
extensions_path = ':'.join(neutron.extensions.__path__)
_subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
def etcdir(*p):
return os.path.join(ETCDIR, *p)
class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
class LoadBalancerTestMixin(object):
resource_prefix_map = dict(
(k, constants.COMMON_PREFIXES[constants.LOADBALANCER])
for k in loadbalancer.RESOURCE_ATTRIBUTE_MAP.keys()
)
def setUp(self, core_plugin=None, lb_plugin=None):
service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS}
super(LoadBalancerPluginDbTestCase, self).setUp(
service_plugins=service_plugins
)
self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
self.plugin = loadbalancer_plugin.LoadBalancerPlugin()
ext_mgr = PluginAwareExtensionManager(
extensions_path,
{constants.LOADBALANCER: self.plugin}
)
app = config.load_paste_app('extensions_test_app')
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
def _create_vip(self, fmt, name, pool_id, protocol, protocol_port,
admin_state_up, expected_res_status=None, **kwargs):
data = {'vip': {'name': name,
@ -97,7 +85,7 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
def _create_pool(self, fmt, name, lb_method, protocol, admin_state_up,
expected_res_status=None, **kwargs):
data = {'pool': {'name': name,
'subnet_id': self._subnet_id,
'subnet_id': _subnet_id,
'lb_method': lb_method,
'protocol': protocol,
'admin_state_up': admin_state_up,
@ -151,12 +139,6 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
return res
def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports']:
return self.api
else:
return self.ext_api
@contextlib.contextmanager
def vip(self, fmt=None, name='vip1', pool=None, subnet=None,
protocol='HTTP', protocol_port=80, admin_state_up=True,
@ -270,7 +252,43 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
self._delete('health_monitors', the_health_monitor['id'])
class LoadBalancerPluginDbTestCase(LoadBalancerTestMixin,
test_db_plugin.NeutronDbPluginV2TestCase):
def setUp(self, core_plugin=None, lb_plugin=None):
service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS}
super(LoadBalancerPluginDbTestCase, self).setUp(
service_plugins=service_plugins
)
self._subnet_id = _subnet_id
self.plugin = loadbalancer_plugin.LoadBalancerPlugin()
get_lbaas_agent_patcher = mock.patch(
'neutron.services.loadbalancer.agent_scheduler'
'.LbaasAgentSchedulerDbMixin.get_lbaas_agent_hosting_pool')
mock_lbaas_agent = mock.MagicMock()
get_lbaas_agent_patcher.start().return_value = mock_lbaas_agent
mock_lbaas_agent.__getitem__.return_value = {'host': 'host'}
self.addCleanup(mock.patch.stopall)
ext_mgr = PluginAwareExtensionManager(
extensions_path,
{constants.LOADBALANCER: self.plugin}
)
app = config.load_paste_app('extensions_test_app')
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
class TestLoadBalancer(LoadBalancerPluginDbTestCase):
def setUp(self):
cfg.CONF.set_override('driver_fqn',
'neutron.services.loadbalancer.drivers.noop'
'.noop_driver.NoopLbaaSDriver',
group='LBAAS')
self.addCleanup(cfg.CONF.reset)
super(TestLoadBalancer, self).setUp()
def test_create_vip(self, **extras):
expected = {
'name': 'vip1',

View File

@ -95,6 +95,7 @@ class DummyServicePlugin(ServicePluginBase):
"""
supported_extension_aliases = ['dummy', servicetype.EXT_ALIAS]
agent_notifiers = {'dummy': 'dummy_agent_notifier'}
def __init__(self):
self.svctype_mgr = servicetype_db.ServiceTypeManager.get_instance()

View File

@ -1106,7 +1106,8 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
def test_router_add_to_l3_agent_notification(self):
plugin = manager.NeutronManager.get_plugin()
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
with mock.patch.object(l3_notifier, 'cast') as mock_l3:
with self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
@ -1116,14 +1117,15 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
routers = [router1['router']['id']]
mock_l3.assert_called_with(
mock.ANY,
plugin.l3_agent_notifier.make_msg(
l3_notifier.make_msg(
'router_added_to_agent',
payload=routers),
topic='l3_agent.hosta')
def test_router_remove_from_l3_agent_notification(self):
plugin = manager.NeutronManager.get_plugin()
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
with mock.patch.object(l3_notifier, 'cast') as mock_l3:
with self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
@ -1133,22 +1135,22 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
self._remove_router_from_l3_agent(hosta_id,
router1['router']['id'])
mock_l3.assert_called_with(
mock.ANY, plugin.l3_agent_notifier.make_msg(
mock.ANY, l3_notifier.make_msg(
'router_removed_from_agent',
payload={'router_id': router1['router']['id']}),
topic='l3_agent.hosta')
def test_agent_updated_l3_agent_notification(self):
plugin = manager.NeutronManager.get_plugin()
with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
with mock.patch.object(l3_notifier, 'cast') as mock_l3:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
self._disable_agent(hosta_id, admin_state_up=False)
mock_l3.assert_called_with(
mock.ANY, plugin.l3_agent_notifier.make_msg(
'agent_updated',
payload={'admin_state_up': False}),
mock.ANY, l3_notifier.make_msg(
'agent_updated', payload={'admin_state_up': False}),
topic='l3_agent.hosta')

View File

@ -53,9 +53,26 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
self.callbacks = plugin_driver.LoadBalancerCallbacks(
self.plugin_instance
)
get_lbaas_agents_patcher = mock.patch(
'neutron.services.loadbalancer.agent_scheduler'
'.LbaasAgentSchedulerDbMixin.get_lbaas_agents')
get_lbaas_agents_patcher.start()
# mocking plugin_driver create_pool() as it does nothing more than
# pool scheduling which is beyond the scope of this test case
mock.patch('neutron.services.loadbalancer.drivers.haproxy'
'.plugin_driver.HaproxyOnHostPluginDriver'
'.create_pool').start()
self.addCleanup(mock.patch.stopall)
def test_get_ready_devices(self):
with self.vip() as vip:
with mock.patch('neutron.services.loadbalancer.agent_scheduler'
'.LbaasAgentSchedulerDbMixin.'
'list_pools_on_lbaas_agent') as mock_agent_pools:
mock_agent_pools.return_value = {
'pools': [{'id': vip['vip']['pool_id']}]}
ready = self.callbacks.get_ready_devices(
context.get_admin_context(),
)
@ -100,6 +117,12 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
self.assertEqual(ctx.session.query(ldb.Pool).count(), 3)
self.assertEqual(ctx.session.query(ldb.Vip).count(), 2)
with mock.patch('neutron.services.loadbalancer.agent_scheduler'
'.LbaasAgentSchedulerDbMixin'
'.list_pools_on_lbaas_agent') as mock_agent_pools:
mock_agent_pools.return_value = {'pools': [{'id': pools[0].id},
{'id': pools[1].id},
{'id': pools[2].id}]}
ready = self.callbacks.get_ready_devices(ctx)
self.assertEqual(len(ready), 2)
self.assertIn(pools[0].id, ready)
@ -119,7 +142,11 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
vip['vip']['id'],
{'vip': {'status': constants.INACTIVE}}
)
with mock.patch('neutron.services.loadbalancer.agent_scheduler'
'.LbaasAgentSchedulerDbMixin.'
'list_pools_on_lbaas_agent') as mock_agent_pools:
mock_agent_pools.return_value = {
'pools': [{'id': vip['vip']['pool_id']}]}
ready = self.callbacks.get_ready_devices(
context.get_admin_context(),
)
@ -135,7 +162,11 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
vip['vip']['pool_id'],
{'pool': {'status': constants.INACTIVE}}
)
with mock.patch('neutron.services.loadbalancer.agent_scheduler'
'.LbaasAgentSchedulerDbMixin.'
'list_pools_on_lbaas_agent') as mock_agent_pools:
mock_agent_pools.return_value = {
'pools': [{'id': vip['vip']['pool_id']}]}
ready = self.callbacks.get_ready_devices(
context.get_admin_context(),
)
@ -235,26 +266,26 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
super(TestLoadBalancerAgentApi, self).setUp()
self.addCleanup(mock.patch.stopall)
self.api = plugin_driver.LoadBalancerAgentApi('topic', 'host')
self.api = plugin_driver.LoadBalancerAgentApi('topic')
self.mock_cast = mock.patch.object(self.api, 'cast').start()
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
def test_init(self):
self.assertEqual(self.api.topic, 'topic')
self.assertEqual(self.api.host, 'host')
def _call_test_helper(self, method_name):
rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id')
rv = getattr(self.api, method_name)(mock.sentinel.context, 'test',
'host')
self.assertEqual(rv, self.mock_cast.return_value)
self.mock_cast.assert_called_once_with(
mock.sentinel.context,
self.mock_msg.return_value,
topic='topic'
topic='topic.host'
)
self.mock_msg.assert_called_once_with(
method_name,
pool_id='the_id',
pool_id='test',
host='host'
)
@ -267,6 +298,21 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
def test_modify_pool(self):
self._call_test_helper('modify_pool')
def test_agent_updated(self):
rv = self.api.agent_updated(mock.sentinel.context, True, 'host')
self.assertEqual(rv, self.mock_cast.return_value)
self.mock_cast.assert_called_once_with(
mock.sentinel.context,
self.mock_msg.return_value,
topic='topic.host',
version='1.1'
)
self.mock_msg.assert_called_once_with(
'agent_updated',
payload={'admin_state_up': True}
)
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
def setUp(self):
@ -276,6 +322,12 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
self.mock_api = api_cls.return_value
# mocking plugin_driver create_pool() as it does nothing more than
# pool scheduling which is beyond the scope of this test case
mock.patch('neutron.services.loadbalancer.drivers.haproxy'
'.plugin_driver.HaproxyOnHostPluginDriver'
'.create_pool').start()
self.addCleanup(mock.patch.stopall)
def test_create_vip(self):
@ -284,7 +336,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
with self.vip(pool=pool, subnet=subnet) as vip:
self.mock_api.reload_pool.assert_called_once_with(
mock.ANY,
vip['vip']['pool_id']
vip['vip']['pool_id'],
'host'
)
def test_update_vip(self):
@ -302,7 +355,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
self.mock_api.reload_pool.assert_called_once_with(
mock.ANY,
vip['vip']['pool_id']
vip['vip']['pool_id'],
'host'
)
self.assertEqual(
@ -319,7 +373,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
self.plugin_instance.delete_vip(ctx, vip['vip']['id'])
self.mock_api.destroy_pool.assert_called_once_with(
mock.ANY,
vip['vip']['pool_id']
vip['vip']['pool_id'],
'host'
)
def test_create_pool(self):
@ -334,7 +389,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
ctx = context.get_admin_context()
self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
self.mock_api.destroy_pool.assert_called_once_with(
mock.ANY, pool['pool']['id'])
mock.ANY, pool['pool']['id'], 'host')
self.assertFalse(self.mock_api.reload_pool.called)
self.assertFalse(self.mock_api.modify_pool.called)
@ -352,7 +407,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
ctx = context.get_admin_context()
self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
self.mock_api.reload_pool.assert_called_once_with(
mock.ANY, pool['pool']['id'])
mock.ANY, pool['pool']['id'], 'host')
self.assertFalse(self.mock_api.destroy_pool.called)
self.assertFalse(self.mock_api.modify_pool.called)
@ -363,14 +418,14 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
self.mock_api.destroy_pool.assert_called_once_with(
mock.ANY, pool['pool']['id'])
mock.ANY, pool['pool']['id'], 'host')
def test_create_member(self):
with self.pool() as pool:
pool_id = pool['pool']['id']
with self.member(pool_id=pool_id):
self.mock_api.modify_pool.assert_called_once_with(
mock.ANY, pool_id)
mock.ANY, pool_id, 'host')
def test_update_member(self):
with self.pool() as pool:
@ -381,7 +436,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
self.plugin_instance.update_member(
ctx, member['member']['id'], member)
self.mock_api.modify_pool.assert_called_once_with(
mock.ANY, pool_id)
mock.ANY, pool_id, 'host')
def test_update_member_new_pool(self):
with self.pool() as pool1:
@ -397,8 +452,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
member)
self.assertEqual(2, self.mock_api.modify_pool.call_count)
self.mock_api.modify_pool.assert_has_calls(
[mock.call(mock.ANY, pool1_id),
mock.call(mock.ANY, pool2_id)])
[mock.call(mock.ANY, pool1_id, 'host'),
mock.call(mock.ANY, pool2_id, 'host')])
def test_delete_member(self):
with self.pool() as pool:
@ -411,7 +466,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
self.mock_api.modify_pool.assert_called_once_with(
mock.ANY, pool_id)
mock.ANY, pool_id, 'host')
def test_create_pool_health_monitor(self):
with self.pool() as pool:
@ -422,7 +477,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
hm,
pool_id)
self.mock_api.modify_pool.assert_called_once_with(
mock.ANY, pool_id)
mock.ANY, pool_id, 'host')
def test_delete_pool_health_monitor(self):
with self.pool() as pool:
@ -436,7 +491,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
self.plugin_instance.delete_pool_health_monitor(
ctx, hm['health_monitor']['id'], pool_id)
self.mock_api.modify_pool.assert_called_once_with(
mock.ANY, pool_id)
mock.ANY, pool_id, 'host')
def test_update_health_monitor_associated_with_pool(self):
with self.health_monitor(type='HTTP') as monitor:
@ -457,7 +512,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
self.assertEqual(res.status_int, 201)
self.mock_api.modify_pool.assert_called_once_with(
mock.ANY,
pool['pool']['id']
pool['pool']['id'],
'host'
)
self.mock_api.reset_mock()
@ -471,5 +527,6 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
req.get_response(self.ext_api)
self.mock_api.modify_pool.assert_called_once_with(
mock.ANY,
pool['pool']['id']
pool['pool']['id'],
'host'
)

View File

@ -0,0 +1,200 @@
# Copyright (c) 2013 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from webob import exc
from neutron.api import extensions
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron import context
from neutron.extensions import agent
from neutron.extensions import lbaas_agentscheduler
from neutron import manager
from neutron.plugins.common import constants as plugin_const
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
from neutron.tests.unit.openvswitch import test_agent_scheduler
from neutron.tests.unit import test_agent_ext_plugin
from neutron.tests.unit import test_db_plugin as test_plugin
from neutron.tests.unit import test_extensions
LBAAS_HOSTA = 'hosta'
class AgentSchedulerTestMixIn(test_agent_scheduler.AgentSchedulerTestMixIn):
def _list_pools_hosted_by_lbaas_agent(self, agent_id,
expected_code=exc.HTTPOk.code,
admin_context=True):
path = "/agents/%s/%s.%s" % (agent_id,
lbaas_agentscheduler.LOADBALANCER_POOLS,
self.fmt)
return self._request_list(path, expected_code=expected_code,
admin_context=admin_context)
def _get_lbaas_agent_hosting_pool(self, pool_id,
expected_code=exc.HTTPOk.code,
admin_context=True):
path = "/lb/pools/%s/%s.%s" % (pool_id,
lbaas_agentscheduler.LOADBALANCER_AGENT,
self.fmt)
return self._request_list(path, expected_code=expected_code,
admin_context=admin_context)
class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn,
AgentSchedulerTestMixIn,
test_db_loadbalancer.LoadBalancerTestMixin,
test_plugin.NeutronDbPluginV2TestCase):
fmt = 'json'
plugin_str = ('neutron.plugins.openvswitch.'
'ovs_neutron_plugin.OVSNeutronPluginV2')
def setUp(self):
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
service_plugins = {
'lb_plugin_name': test_db_loadbalancer.DB_LB_PLUGIN_KLASS}
super(LBaaSAgentSchedulerTestCase, self).setUp(
self.plugin_str, service_plugins=service_plugins)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.adminContext = context.get_admin_context()
# Add the resources to the global attribute map
# This is done here as the setup process won't
# initialize the main API router which extends
# the global attribute map
attributes.RESOURCE_ATTRIBUTE_MAP.update(
agent.RESOURCE_ATTRIBUTE_MAP)
self.addCleanup(self.restore_attribute_map)
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def test_report_states(self):
self._register_agent_states(lbaas_agents=True)
agents = self._list_agents()
self.assertEqual(6, len(agents['agents']))
def test_pool_scheduling_on_pool_creation(self):
self._register_agent_states(lbaas_agents=True)
with self.pool() as pool:
lbaas_agent = self._get_lbaas_agent_hosting_pool(
pool['pool']['id'])
self.assertIsNotNone(lbaas_agent)
self.assertEqual(lbaas_agent['agent']['agent_type'],
constants.AGENT_TYPE_LOADBALANCER)
pools = self._list_pools_hosted_by_lbaas_agent(
lbaas_agent['agent']['id'])
self.assertEqual(1, len(pools['pools']))
self.assertEqual(pool['pool'], pools['pools'][0])
def test_schedule_poll_with_disabled_agent(self):
lbaas_hosta = {
'binary': 'neutron-loadbalancer-agent',
'host': LBAAS_HOSTA,
'topic': 'LOADBALANCER_AGENT',
'configurations': {'device_driver': 'device_driver',
'interface_driver': 'interface_driver'},
'agent_type': constants.AGENT_TYPE_LOADBALANCER}
self._register_one_agent_state(lbaas_hosta)
with self.pool() as pool:
lbaas_agent = self._get_lbaas_agent_hosting_pool(
pool['pool']['id'])
self.assertIsNotNone(lbaas_agent)
agents = self._list_agents()
self._disable_agent(agents['agents'][0]['id'])
pool = {'pool': {'name': 'test',
'subnet_id': 'test',
'lb_method': 'ROUND_ROBIN',
'protocol': 'HTTP',
'admin_state_up': True,
'tenant_id': 'test',
'description': 'test'}}
lbaas_plugin = manager.NeutronManager.get_service_plugins()[
plugin_const.LOADBALANCER]
self.assertRaises(lbaas_agentscheduler.NoEligibleLbaasAgent,
lbaas_plugin.create_pool, self.adminContext, pool)
def test_schedule_poll_with_down_agent(self):
lbaas_hosta = {
'binary': 'neutron-loadbalancer-agent',
'host': LBAAS_HOSTA,
'topic': 'LOADBALANCER_AGENT',
'configurations': {'device_driver': 'device_driver',
'interface_driver': 'interface_driver'},
'agent_type': constants.AGENT_TYPE_LOADBALANCER}
self._register_one_agent_state(lbaas_hosta)
is_agent_down_str = 'neutron.db.agents_db.AgentDbMixin.is_agent_down'
with mock.patch(is_agent_down_str) as mock_is_agent_down:
mock_is_agent_down.return_value = False
with self.pool() as pool:
lbaas_agent = self._get_lbaas_agent_hosting_pool(
pool['pool']['id'])
self.assertIsNotNone(lbaas_agent)
with mock.patch(is_agent_down_str) as mock_is_agent_down:
mock_is_agent_down.return_value = True
pool = {'pool': {'name': 'test',
'subnet_id': 'test',
'lb_method': 'ROUND_ROBIN',
'protocol': 'HTTP',
'admin_state_up': True,
'tenant_id': 'test',
'description': 'test'}}
lbaas_plugin = manager.NeutronManager.get_service_plugins()[
plugin_const.LOADBALANCER]
self.assertRaises(lbaas_agentscheduler.NoEligibleLbaasAgent,
lbaas_plugin.create_pool,
self.adminContext, pool)
def test_pool_unscheduling_on_pool_deletion(self):
self._register_agent_states(lbaas_agents=True)
with self.pool(no_delete=True) as pool:
lbaas_agent = self._get_lbaas_agent_hosting_pool(
pool['pool']['id'])
self.assertIsNotNone(lbaas_agent)
self.assertEqual(lbaas_agent['agent']['agent_type'],
constants.AGENT_TYPE_LOADBALANCER)
pools = self._list_pools_hosted_by_lbaas_agent(
lbaas_agent['agent']['id'])
self.assertEqual(1, len(pools['pools']))
self.assertEqual(pool['pool'], pools['pools'][0])
req = self.new_delete_request('pools',
pool['pool']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
pools = self._list_pools_hosted_by_lbaas_agent(
lbaas_agent['agent']['id'])
self.assertEqual(0, len(pools['pools']))
def test_pool_scheduling_non_admin_access(self):
self._register_agent_states(lbaas_agents=True)
with self.pool() as pool:
self._get_lbaas_agent_hosting_pool(
pool['pool']['id'],
expected_code=exc.HTTPForbidden.code,
admin_context=False)
self._list_pools_hosted_by_lbaas_agent(
'fake_id',
expected_code=exc.HTTPForbidden.code,
admin_context=False)
class LBaaSAgentSchedulerTestCaseXML(LBaaSAgentSchedulerTestCase):
fmt = 'xml'

View File

@ -45,6 +45,8 @@ DHCP_HOSTA = 'hosta'
L3_HOSTB = 'hostb'
DHCP_HOSTC = 'hostc'
DHCP_HOST1 = 'host1'
LBAAS_HOSTA = 'hosta'
LBAAS_HOSTB = 'hostb'
class AgentTestExtensionManager(object):
@ -83,7 +85,7 @@ class AgentDBTestMixIn(object):
self.assertEqual(agent_res.status_int, expected_res_status)
return agent_res
def _register_agent_states(self):
def _register_agent_states(self, lbaas_agents=False):
"""Register two L3 agents and two DHCP agents."""
l3_hosta = {
'binary': 'neutron-l3-agent',
@ -110,6 +112,16 @@ class AgentDBTestMixIn(object):
'agent_type': constants.AGENT_TYPE_DHCP}
dhcp_hostc = copy.deepcopy(dhcp_hosta)
dhcp_hostc['host'] = DHCP_HOSTC
lbaas_hosta = {
'binary': 'neutron-loadbalancer-agent',
'host': LBAAS_HOSTA,
'topic': 'LOADBALANCER_AGENT',
'configurations': {'device_driver': 'device_driver',
'interface_driver': 'interface_driver',
},
'agent_type': constants.AGENT_TYPE_LOADBALANCER}
lbaas_hostb = copy.deepcopy(lbaas_hosta)
lbaas_hostb['host'] = LBAAS_HOSTB
callback = agents_db.AgentExtRpcCallback()
callback.report_state(self.adminContext,
agent_state={'agent_state': l3_hosta},
@ -123,7 +135,18 @@ class AgentDBTestMixIn(object):
callback.report_state(self.adminContext,
agent_state={'agent_state': dhcp_hostc},
time=timeutils.strtime())
return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
res = [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
if lbaas_agents:
callback.report_state(self.adminContext,
agent_state={'agent_state': lbaas_hosta},
time=timeutils.strtime())
callback.report_state(self.adminContext,
agent_state={'agent_state': lbaas_hostb},
time=timeutils.strtime())
res += [lbaas_hosta, lbaas_hostb]
return res
def _register_one_dhcp_agent(self):
"""Register one DHCP agent."""

View File

@ -47,6 +47,11 @@ class MultiServiceCorePlugin(object):
supported_extension_aliases = ['lbaas', 'dummy']
class CorePluginWithAgentNotifiers(object):
agent_notifiers = {'l3': 'l3_agent_notifier',
'dhcp': 'dhcp_agent_notifier'}
class NeutronManagerTestCase(base.BaseTestCase):
def setUp(self):
@ -121,3 +126,16 @@ class NeutronManagerTestCase(base.BaseTestCase):
self.assertIsNotNone(validate_pre_plugin_load())
cfg.CONF.set_override('core_plugin', 'dummy.plugin')
self.assertIsNone(validate_pre_plugin_load())
def test_manager_gathers_agent_notifiers_from_service_plugins(self):
cfg.CONF.set_override("service_plugins",
["neutron.tests.unit.dummy_plugin."
"DummyServicePlugin"])
cfg.CONF.set_override("core_plugin",
"neutron.tests.unit.test_neutron_manager."
"CorePluginWithAgentNotifiers")
expected = {'l3': 'l3_agent_notifier',
'dhcp': 'dhcp_agent_notifier',
'dummy': 'dummy_agent_notifier'}
core_plugin = NeutronManager.get_plugin()
self.assertEqual(expected, core_plugin.agent_notifiers)