RPC support for Linux Bridge Plugin and Agent

blueprint scalable-agent-comms

This is the first stage of the blueprint. This adds support to the linux bridge
plugin.

The development followed the design described in:
https://docs.google.com/document/d/1MbcBA2Os4b98ybdgAw2qe_68R1NG6KMh8zdZKgOlpvg/edit?pli=1

Change-Id: I4004c05a63ce49f020c2016c8763e73238b465a7
This commit is contained in:
Gary Kotton 2012-07-10 11:28:08 -04:00
parent de1d7d9537
commit dea9ecf4b2
19 changed files with 686 additions and 103 deletions

View File

@ -16,9 +16,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import os
import sys
sys.path.insert(0, os.getcwd())
from quantum.server import main as server
eventlet.monkey_patch()
server()

View File

@ -39,41 +39,9 @@ api_paste_config = api-paste.ini
# Maximum amount of retries to generate a unique MAC address
# mac_generation_retries = 16
[QUOTAS]
# number of networks allowed per tenant
# quota_network = 10
# number of subnets allowed per tenant
# quota_subnet = 10
# number of ports allowed per tenant
# quota_port = 50
# default driver to use for quota checks
# quota_driver = quantum.quota.ConfDriver
# ============ Notification System Options =====================
# Notifications can be sent when network/subnet/port are create, updated or deleted.
# There are four methods of sending notifications, logging (via the
# log_file directive), rpc (via a message queue),
# noop (no notifications sent, the default) or list of them
# Defined in notifier api
# notification_driver = quantum.openstack.common.notifier.no_op_notifier
# default_notification_level = INFO
# myhost = myhost.com
# default_publisher_id = $myhost
# Defined in rabbit_notifier for rpc way
# notification_topics = notifications
# Defined in list_notifier
# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier
# Defined in rpc __init__
# RPC configuration options. Defined in rpc __init__
# The messaging module to use, defaults to kombu.
# rpc_backend =quantum.openstack.common.notifier.rpc.impl_kombu
# rpc_backend = quantum.openstack.common.notifier.rpc.impl_kombu
# Size of RPC thread pool
# rpc_thread_pool_size = 64,
# Size of RPC connection pool
@ -85,8 +53,9 @@ api_paste_config = api-paste.ini
# Modules of exceptions that are permitted to be recreated
# upon receiving exception data from an rpc call.
# allowed_rpc_exception_modules = quantum.openstack.common.exception, nova.exception
# AMQP exchange to connect to if using RabbitMQ or Qpid
# control_exchange = nova
# AMQP exchange to connect to if using RabbitMQ or QPID
control_exchange = quantum
# If passed, use a fake RabbitMQ provider
# fake_rabbit = False
@ -152,11 +121,35 @@ api_paste_config = api-paste.ini
# ZeroMQ bind address. Should be a wildcard (*), an ethernet interface, or IP.
# The "host" option should point or resolve to this address.
# rpc_zmq_bind_address = *
# MatchMaker driver
# rpc_zmq_matchmaker = openstack.common.rpc.matchmaker.MatchMakerLocalhost
# ZeroMQ receiver listening port
# rpc_zmq_port = 9501
# Number of ZeroMQ contexts, defaults to 1
# rpc_zmq_contexts = 1
# Directory for holding IPC sockets
# rpc_zmq_ipc_dir = /var/run/openstack
[QUOTAS]
# number of networks allowed per tenant
# quota_network = 10
# number of subnets allowed per tenant
# quota_subnet = 10
# number of ports allowed per tenant
# quota_port = 50
# default driver to use for quota checks
# quota_driver = quantum.quota.ConfDriver
# ============ Notification System Options =====================
# Notifications can be sent when network/subnet/port are create, updated or deleted.
# There are four methods of sending notifications, logging (via the
# log_file directive), rpc (via a message queue),
# noop (no notifications sent, the default) or list of them
# Defined in notifier api
# notification_driver = quantum.openstack.common.notifier.no_op_notifier
# default_notification_level = INFO
# myhost = myhost.com
# default_publisher_id = $myhost
# Defined in rabbit_notifier for rpc way
# notification_topics = notifications
# Defined in list_notifier
# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier

View File

@ -1,6 +1,6 @@
[VLANS]
vlan_start=1000
vlan_end=3000
vlan_start = 1000
vlan_end = 3000
[DATABASE]
# This line MUST be changed to actually run the plugin.
@ -25,3 +25,7 @@ polling_interval = 2
# Change to "sudo quantum-rootwrap" to limit commands that can be run
# as root.
root_helper = sudo
# Use Quantumv2 API
# target_v2_api = False
# Use RPC messaging to interface between agent and plugin
# rpc = True

View File

@ -17,9 +17,12 @@
#
# @author: Juliano Martinez, Locaweb.
import fcntl
import logging
import os
import shlex
import socket
import struct
import subprocess
LOG = logging.getLogger(__name__)
@ -50,3 +53,14 @@ def execute(cmd, root_helper=None, process_input=None, addl_env=None,
raise RuntimeError(m)
return return_stderr and (_stdout, _stderr) or _stdout
def get_interface_mac(interface):
DEVICE_NAME_LEN = 15
MAC_START = 18
MAC_END = 24
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927,
struct.pack('256s', interface[:DEVICE_NAME_LEN]))
return ''.join(['%02x:' % ord(char)
for char in info[MAC_START:MAC_END]])[:-1]

36
quantum/agent/rpc.py Normal file
View File

@ -0,0 +1,36 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from quantum.common import topics
from quantum.openstack.common import rpc
def create_consumers(dispatcher, prefix, topic_details):
"""Create agent RPC consumers.
:param dispatcher: The dispatcher to process the incoming messages.
:param prefix: Common prefix for the plugin/agent message queues.
:param topic_details: A list of topics. Each topic has a name and a
operation.
:returns: A common Connection.
"""
connection = rpc.create_connection(new=True)
for topic, operation in topic_details:
topic_name = topics.get_topic_name(prefix, topic, operation)
connection.create_consumer(topic_name, dispatcher, fanout=True)
connection.consume_in_thread()
return connection

42
quantum/common/topics.py Normal file
View File

@ -0,0 +1,42 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
NETWORK = 'network'
SUBNET = 'subnet'
PORT = 'port'
CREATE = 'create'
DELETE = 'delete'
UPDATE = 'update'
AGENT = 'q-agent-notifier'
PLUGIN = 'q-plugin'
def get_topic_name(prefix, table, operation):
"""Create a topic name.
The topic name needs to be synced between the agent and the
plugin. The plugin will send a fanout message to all of the
listening agents so that the agents in turn can perform their
updates accordingly.
:param prefix: Common prefix for the plugin/agent message queues.
:param table: The table in question (NETWORK, SUBNET, PORT).
:param operation: The operation that invokes notification (CREATE,
DELETE, UPDATE)
:returns: The topic name.
"""
return '%s-%s-%s' % (prefix, table, operation)

View File

@ -464,7 +464,7 @@ def _is_opt_registered(opts, opt):
:raises: DuplicateOptError if a naming conflict is detected
"""
if opt.dest in opts:
if opts[opt.dest]['opt'] is not opt:
if opts[opt.dest]['opt'] != opt:
raise DuplicateOptError(opt.name)
return True
else:
@ -527,6 +527,9 @@ class Opt(object):
else:
self.deprecated_name = None
def __ne__(self, another):
return vars(self) != vars(another)
def _get_from_config_parser(self, cparser, section):
"""Retrieves the option value from a MultiConfigParser object.

View File

@ -19,7 +19,6 @@
Exceptions common to OpenStack projects
"""
import itertools
import logging

View File

@ -257,16 +257,18 @@ class PublishErrorsHandler(logging.Handler):
dict(error=record.msg))
def handle_exception(type, value, tb):
extra = {}
if CONF.verbose:
extra['exc_info'] = (type, value, tb)
getLogger().critical(str(value), **extra)
def _create_logging_excepthook(product_name):
def logging_excepthook(type, value, tb):
extra = {}
if CONF.verbose:
extra['exc_info'] = (type, value, tb)
getLogger(product_name).critical(str(value), **extra)
return logging_excepthook
def setup(product_name):
"""Setup logging."""
sys.excepthook = handle_exception
sys.excepthook = _create_logging_excepthook(product_name)
if CONF.log_config:
try:
@ -357,17 +359,6 @@ def _setup_logging_from_conf(product_name):
for handler in log_root.handlers:
logger.addHandler(handler)
# NOTE(jkoelker) Clear the handlers for the root logger that was setup
# by basicConfig in nova/__init__.py and install the
# NullHandler.
root = logging.getLogger()
for handler in root.handlers:
root.removeHandler(handler)
handler = NullHandler()
handler.setFormatter(logging.Formatter())
root.addHandler(handler)
_loggers = {}
@ -405,8 +396,12 @@ class LegacyFormatter(logging.Formatter):
def format(self, record):
"""Uses contextstring if request_id is set, otherwise default."""
if 'instance' not in record.__dict__:
record.__dict__['instance'] = ''
# NOTE(sdague): default the fancier formating params
# to an empty string so we don't throw an exception if
# they get used
for key in ('instance', 'color'):
if key not in record.__dict__:
record.__dict__[key] = ''
if record.__dict__.get('request_id', None):
self._fmt = CONF.logging_context_format_string

View File

@ -108,7 +108,7 @@ class Connection(object):
"""
raise NotImplementedError()
def create_consumer(self, conf, topic, proxy, fanout=False):
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer on this connection.
A consumer is associated with a message queue on the backend message
@ -117,7 +117,6 @@ class Connection(object):
off of the queue will determine which method gets called on the proxy
object.
:param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
topic. For example, all instances of nova-compute consume
@ -133,7 +132,7 @@ class Connection(object):
"""
raise NotImplementedError()
def create_worker(self, conf, topic, proxy, pool_name):
def create_worker(self, topic, proxy, pool_name):
"""Create a worker on this connection.
A worker is like a regular consumer of messages directed to a
@ -143,7 +142,6 @@ class Connection(object):
be asked to process it. Load is distributed across the members
of the pool in round-robin fashion.
:param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
topic.

View File

@ -329,7 +329,7 @@ class Connection(object):
if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = (
self.conf.qpid_reconnect_interval)
self.connection.hearbeat = self.conf.qpid_heartbeat
self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay

View File

@ -21,7 +21,6 @@ Time related utilities and helper functions.
import calendar
import datetime
import time
import iso8601

View File

@ -30,10 +30,18 @@ import subprocess
import sys
import time
import eventlet
import pyudev
from sqlalchemy.ext.sqlsoup import SqlSoup
from quantum.openstack.common import cfg
from quantum.agent.rpc import create_consumers
from quantum.common import config as logging_config
from quantum.common import topics
from quantum.openstack.common import cfg
from quantum.openstack.common import context
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher
from quantum.openstack.common.rpc import proxy
from quantum.plugins.linuxbridge.common import config
from quantum.agent.linux import utils
@ -312,25 +320,97 @@ class LinuxBridge:
LOG.debug("Done deleting subinterface %s" % interface)
class LinuxBridgeQuantumAgent:
class PluginApi(proxy.RpcProxy):
'''Agent side of the linux bridge rpc API.
API version history:
1.0 - Initial version.
'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(PluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def get_device_details(self, context, device, agent_id):
return self.call(context,
self.make_msg('get_device_details', device=device,
agent_id=agent_id),
topic=self.topic)
def update_device_down(self, context, device, agent_id):
return self.call(context,
self.make_msg('update_device_down', device=device,
agent_id=agent_id),
topic=self.topic)
class LinuxBridgeRpcCallbacks():
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
def __init__(self, context, linux_br):
self.context = context
self.linux_br = linux_br
def network_delete(self, context, **kwargs):
LOG.debug("network_delete received")
network_id = kwargs.get('network_id')
bridge_name = self.linux_br.get_bridge_name(network_id)
# (TODO) garyk delete the bridge interface
LOG.debug("Delete %s", bridge_name)
def port_update(self, context, **kwargs):
LOG.debug("port_update received")
port = kwargs.get('port')
if port['admin_state_up']:
vlan_id = kwargs.get('vlan_id')
# create the networking for the port
self.linux_br.add_interface(port['network_id'],
vlan_id,
port['id'])
else:
bridge_name = self.linux_br.get_bridge_name(port['network_id'])
tap_device_name = self.linux_br.get_tap_device_name(port['id'])
self.linux_br.remove_interface(bridge_name, tap_device_name)
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return dispatcher.RpcDispatcher([self])
class LinuxBridgeQuantumAgentDB:
def __init__(self, br_name_prefix, physical_interface, polling_interval,
reconnect_interval, root_helper, target_v2_api):
reconnect_interval, root_helper, target_v2_api,
db_connection_url):
self.polling_interval = polling_interval
self.reconnect_interval = reconnect_interval
self.root_helper = root_helper
self.setup_linux_bridge(br_name_prefix, physical_interface)
self.db_connected = False
self.target_v2_api = target_v2_api
self.reconnect_interval = reconnect_interval
self.db_connected = False
self.db_connection_url = db_connection_url
def setup_linux_bridge(self, br_name_prefix, physical_interface):
self.linux_br = LinuxBridge(br_name_prefix, physical_interface,
self.root_helper)
def process_port_binding(self, port_id, network_id, interface_id,
vlan_id):
def process_port_binding(self, network_id, interface_id, vlan_id):
return self.linux_br.add_interface(network_id, vlan_id, interface_id)
def remove_port_binding(self, network_id, interface_id):
bridge_name = self.linux_br.get_bridge_name(network_id)
tap_device_name = self.linux_br.get_tap_device_name(interface_id)
return self.linux_br.remove_interface(bridge_name, tap_device_name)
def process_unplugged_interfaces(self, plugged_interfaces):
"""
If there are any tap devices that are not corresponding to the
@ -434,8 +514,7 @@ class LinuxBridgeQuantumAgent:
interface_id = pb['interface_id']
vlan_id = str(vlan_bindings[pb['network_id']]['vlan_id'])
if self.process_port_binding(port_id,
pb['network_id'],
if self.process_port_binding(pb['network_id'],
interface_id,
vlan_id):
if self.target_v2_api:
@ -466,7 +545,7 @@ class LinuxBridgeQuantumAgent:
return {VLAN_BINDINGS: vlan_bindings,
PORT_BINDINGS: port_bindings}
def daemon_loop(self, db_connection_url):
def daemon_loop(self):
old_vlan_bindings = {}
old_port_bindings = []
self.db_connected = False
@ -474,7 +553,7 @@ class LinuxBridgeQuantumAgent:
while True:
if not self.db_connected:
time.sleep(self.reconnect_interval)
db = SqlSoup(db_connection_url)
db = SqlSoup(self.db_connection_url)
self.db_connected = True
LOG.info("Connecting to database \"%s\" on %s" %
(db.engine.url.database, db.engine.url.host))
@ -486,6 +565,158 @@ class LinuxBridgeQuantumAgent:
time.sleep(self.polling_interval)
class LinuxBridgeQuantumAgentRPC:
def __init__(self, br_name_prefix, physical_interface, polling_interval,
root_helper):
self.polling_interval = polling_interval
self.root_helper = root_helper
self.setup_linux_bridge(br_name_prefix, physical_interface)
self.setup_rpc(physical_interface)
def setup_rpc(self, physical_interface):
mac = utils.get_interface_mac(physical_interface)
self.agent_id = '%s%s' % ('lb', (mac.replace(":", "")))
self.topic = topics.AGENT
self.plugin_rpc = PluginApi(topics.PLUGIN)
# RPC network init
self.context = context.RequestContext('quantum', 'quantum',
is_admin=False)
# Handle updates from service
self.callbacks = LinuxBridgeRpcCallbacks(self.context,
self.linux_br)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE]]
self.connection = create_consumers(self.dispatcher, self.topic,
consumers)
self.udev = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(self.udev)
monitor.filter_by('net')
def setup_linux_bridge(self, br_name_prefix, physical_interface):
self.linux_br = LinuxBridge(br_name_prefix, physical_interface,
self.root_helper)
def process_port_binding(self, network_id, interface_id, vlan_id):
return self.linux_br.add_interface(network_id, vlan_id, interface_id)
def remove_port_binding(self, network_id, interface_id):
bridge_name = self.linux_br.get_bridge_name(network_id)
tap_device_name = self.linux_br.get_tap_device_name(interface_id)
return self.linux_br.remove_interface(bridge_name, tap_device_name)
def update_devices(self, registered_devices):
devices = self.udev_get_all_tap_devices()
if devices == registered_devices:
return
added = devices - registered_devices
removed = registered_devices - devices
return {'current': devices,
'added': added,
'removed': removed}
def udev_get_all_tap_devices(self):
devices = set()
for device in self.udev.list_devices(subsystem='net'):
name = self.udev_get_name(device)
if self.is_tap_device(name):
devices.add(name)
return devices
def is_tap_device(self, name):
return name.startswith(TAP_INTERFACE_PREFIX)
def udev_get_name(self, device):
return device.sys_name
def process_network_devices(self, device_info):
resync_a = False
resync_b = False
if 'added' in device_info:
resync_a = self.treat_devices_added(device_info['added'])
if 'removed' in device_info:
resync_b = self.treat_devices_removed(device_info['removed'])
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
def treat_devices_added(self, devices):
resync = False
for device in devices:
LOG.info("Port %s added", device)
try:
details = self.plugin_rpc.get_device_details(self.context,
device,
self.agent_id)
except Exception as e:
LOG.debug("Unable to get port details for %s: %s", device, e)
resync = True
continue
if 'port_id' in details:
LOG.info("Port %s updated. Details: %s", device, details)
if details['admin_state_up']:
# create the networking for the port
self.process_port_binding(details['network_id'],
details['port_id'],
details['vlan_id'])
else:
self.remove_port_binding(details['network_id'],
details['port_id'])
else:
LOG.debug("Device %s not defined on plugin", device)
return resync
def treat_devices_removed(self, devices):
resync = False
for device in devices:
LOG.info("Attachment %s removed", device)
try:
details = self.plugin_rpc.update_device_down(self.context,
device,
self.agent_id)
except Exception as e:
LOG.debug("port_removed failed for %s: %s", device, e)
resync = True
if details['exists']:
LOG.info("Port %s updated.", device)
# Nothing to do regarding local networking
else:
LOG.debug("Device %s not defined on plugin", device)
return resync
def daemon_loop(self):
sync = True
devices = set()
LOG.info("LinuxBridge Agent RPC Daemon Started!")
while True:
start = time.time()
if sync:
LOG.info("Agent out of sync with plugin!")
devices.clear()
sync = False
device_info = self.update_devices(devices)
# notify plugin about device deltas
if device_info:
LOG.debug("Agent loop has new devices!")
# If treat devices fails - indicates must resync with plugin
sync = self.process_network_devices(device_info)
devices = device_info['current']
# sleep till end of polling interval
elapsed = (time.time() - start)
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug("Loop iteration exceeded interval (%s vs. %s)!",
self.polling_interval, elapsed)
def main():
cfg.CONF(args=sys.argv, project='quantum')
@ -497,15 +728,29 @@ def main():
polling_interval = cfg.CONF.AGENT.polling_interval
reconnect_interval = cfg.CONF.DATABASE.reconnect_interval
root_helper = cfg.CONF.AGENT.root_helper
'Establish database connection and load models'
db_connection_url = cfg.CONF.DATABASE.sql_connection
plugin = LinuxBridgeQuantumAgent(br_name_prefix, physical_interface,
polling_interval, reconnect_interval,
root_helper, cfg.CONF.AGENT.target_v2_api)
LOG.info("Agent initialized successfully, now running... ")
plugin.daemon_loop(db_connection_url)
rpc = cfg.CONF.AGENT.rpc
if not cfg.CONF.AGENT.target_v2_api:
rpc = False
if rpc:
plugin = LinuxBridgeQuantumAgentRPC(br_name_prefix,
physical_interface,
polling_interval,
root_helper)
else:
db_connection_url = cfg.CONF.DATABASE.sql_connection
target_v2_api = cfg.CONF.AGENT.target_v2_api
plugin = LinuxBridgeQuantumAgentDB(br_name_prefix,
physical_interface,
polling_interval,
reconnect_interval,
root_helper,
target_v2_api,
db_connection_url)
LOG.info("Agent initialized successfully, now running... ")
plugin.daemon_loop()
sys.exit(0)
if __name__ == "__main__":
eventlet.monkey_patch()
main()

View File

@ -39,6 +39,7 @@ agent_opts = [
cfg.IntOpt('polling_interval', default=2),
cfg.StrOpt('root_helper', default='sudo'),
cfg.BoolOpt('target_v2_api', default=False),
cfg.BoolOpt('rpc', default=True),
]

View File

@ -20,8 +20,10 @@ import logging
from sqlalchemy import func
from sqlalchemy.orm import exc
from quantum.api import api_common
from quantum.common import exceptions as q_exc
import quantum.db.api as db
from quantum.db import models_v2
from quantum.openstack.common import cfg
from quantum.plugins.linuxbridge.common import config
from quantum.plugins.linuxbridge.common import exceptions as c_exc
@ -281,3 +283,31 @@ def update_vlan_binding(netid, newvlanid=None):
return binding
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=netid)
def get_port_from_device(device):
"""Get port from database"""
LOG.debug("get_port_from_device() called")
session = db.get_session()
ports = session.query(models_v2.Port).all()
if not ports:
return
for port in ports:
if port['id'].startswith(device):
return port
return
def set_port_status(port_id, status):
"""Set the port status"""
LOG.debug("set_port_status as %s called", status)
session = db.get_session()
try:
port = session.query(models_v2.Port).filter_by(id=port_id).one()
port['status'] = status
if status == api_common.OperationalStatus.DOWN:
port['device_id'] = ''
session.merge(port)
session.flush()
except exc.NoResultFound:
raise q_exc.PortNotFound(port_id=port_id)

View File

@ -15,15 +15,114 @@
import logging
from quantum.api import api_common
from quantum.api.v2 import attributes
from quantum.common import topics
from quantum.db import db_base_plugin_v2
from quantum.db import models_v2
from quantum.openstack.common import context
from quantum.openstack.common import cfg
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import dispatcher
from quantum.openstack.common.rpc import proxy
from quantum.plugins.linuxbridge.db import l2network_db as cdb
from quantum import policy
LOG = logging.getLogger(__name__)
class LinuxBridgeRpcCallbacks():
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
# Device names start with "tap"
TAP_PREFIX_LEN = 3
def __init__(self, context):
self.context = context
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
If a manager would like to set an rpc API version, or support more than
one class as the target of rpc messages, override this method.
'''
return dispatcher.RpcDispatcher([self])
def get_device_details(self, context, **kwargs):
"""Agent requests device details"""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug("Device %s details requested from %s", device, agent_id)
port = cdb.get_port_from_device(device[self.TAP_PREFIX_LEN:])
if port:
vlan_binding = cdb.get_vlan_binding(port['network_id'])
entry = {'device': device,
'vlan_id': vlan_binding['vlan_id'],
'network_id': port['network_id'],
'port_id': port['id'],
'admin_state_up': port['admin_state_up']}
# Set the port status to UP
cdb.set_port_status(port['id'], api_common.OperationalStatus.UP)
else:
entry = {'device': device}
LOG.debug("%s can not be found in database", device)
return entry
def update_device_down(self, context, **kwargs):
"""Device no longer exists on agent"""
# (TODO) garyk - live migration and port status
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug("Device %s no longer exists on %s", device, agent_id)
port = cdb.get_port_from_device(device[self.TAP_PREFIX_LEN:])
if port:
entry = {'device': device,
'exists': True}
# Set port status to DOWN
cdb.set_port_status(port['id'], api_common.OperationalStatus.DOWN)
else:
entry = {'device': device,
'exists': False}
LOG.debug("%s can not be found in database", device)
return entry
class AgentNotifierApi(proxy.RpcProxy):
'''Agent side of the linux bridge rpc API.
API version history:
1.0 - Initial version.
'''
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(AgentNotifierApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
self.topic_port_update = topics.get_topic_name(topic,
topics.PORT,
topics.UPDATE)
def network_delete(self, context, network_id):
self.fanout_cast(context,
self.make_msg('network_delete',
network_id=network_id),
topic=self.topic_network_delete)
def port_update(self, context, port, vlan_id):
self.fanout_cast(context,
self.make_msg('port_update',
port=port,
vlan_id=vlan_id),
topic=self.topic_port_update)
class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
"""Implement the Quantum abstractions using Linux bridging.
@ -42,8 +141,27 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
def __init__(self):
cdb.initialize(base=models_v2.model_base.BASEV2)
self.rpc = cfg.CONF.AGENT.rpc
if cfg.CONF.AGENT.rpc and cfg.CONF.AGENT.target_v2_api:
self.setup_rpc()
if not cfg.CONF.AGENT.target_v2_api:
self.rpc = False
LOG.debug("Linux Bridge Plugin initialization complete")
def setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
self.context = context.RequestContext('quantum', 'quantum',
is_admin=False)
self.conn = rpc.create_connection(new=True)
self.callbacks = LinuxBridgeRpcCallbacks(self.context)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.notifier = AgentNotifierApi(topics.AGENT)
# TODO(rkukura) Use core mechanism for attribute authorization
# when available.
@ -91,6 +209,8 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
vlan_binding = cdb.get_vlan_binding(id)
result = super(LinuxBridgePluginV2, self).delete_network(context, id)
cdb.release_vlanid(vlan_binding['vlan_id'])
if self.rpc:
self.notifier.network_delete(self.context, id)
return result
def get_network(self, context, id, fields=None, verbose=None):
@ -106,3 +226,15 @@ class LinuxBridgePluginV2(db_base_plugin_v2.QuantumDbPluginV2):
self._extend_network_dict(context, net)
# TODO(rkukura): Filter on extended attributes.
return [self._fields(net, fields) for net in nets]
def update_port(self, context, id, port):
if self.rpc:
original_port = super(LinuxBridgePluginV2, self).get_port(context,
id)
port = super(LinuxBridgePluginV2, self).update_port(context, id, port)
if self.rpc:
if original_port['admin_state_up'] != port['admin_state_up']:
vlan_binding = cdb.get_vlan_binding(port['network_id'])
self.notifier.port_update(self.context, port,
vlan_binding['vlan_id'])
return port

View File

@ -62,8 +62,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
vlan_id = vlan_bind[lconst.VLANID]
self._linuxbridge_quantum_agent.process_port_binding(
new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
device_name, str(vlan_id))
new_network[lconst.NET_ID], device_name, str(vlan_id))
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
@ -101,8 +100,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
vlan_id = vlan_bind[lconst.VLANID]
self._linuxbridge_quantum_agent.process_port_binding(
new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
interface_id, str(vlan_id))
new_network[lconst.NET_ID], interface_id, str(vlan_id))
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
@ -140,8 +138,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
vlan_id = vlan_bind[lconst.VLANID]
self._linuxbridge_quantum_agent.process_port_binding(
new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
interface_id, str(vlan_id))
new_network[lconst.NET_ID], interface_id, str(vlan_id))
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
@ -283,8 +280,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
vlan_id = vlan_bind[lconst.VLANID]
self._linuxbridge_quantum_agent.process_port_binding(
new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
interface_id, str(vlan_id))
new_network[lconst.NET_ID], interface_id, str(vlan_id))
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
self._linuxbridge_plugin.unplug_interface(tenant_id,
@ -347,8 +343,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
vlan_id = vlan_bind[lconst.VLANID]
self._linuxbridge_quantum_agent.process_port_binding(
new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
interface_id, str(vlan_id))
new_network[lconst.NET_ID], interface_id, str(vlan_id))
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
self._linuxbridge_plugin.unplug_interface(tenant_id,
@ -412,6 +407,7 @@ class LinuxBridgeAgentTest(unittest.TestCase):
self.gw_name_prefix = "gw-"
self.tap_name_prefix = "tap"
self.v2 = True
self.db_connection = 'sqlite://'
self._linuxbridge_plugin = LinuxBridgePlugin.LinuxBridgePlugin()
try:
fh = open(self.config_file)
@ -430,13 +426,15 @@ class LinuxBridgeAgentTest(unittest.TestCase):
self._linuxbridge = linux_agent.LinuxBridge(self.br_name_prefix,
self.physical_interface,
self.root_helper)
self._linuxbridge_quantum_agent = linux_agent.LinuxBridgeQuantumAgent(
plugin = linux_agent.LinuxBridgeQuantumAgentDB(
self.br_name_prefix,
self.physical_interface,
self.polling_interval,
self.reconnect_interval,
self.root_helper,
self.v2)
self.v2,
self.db_connection)
self._linuxbridge_quantum_agent = plugin
def run_cmd(self, args):
cmd = shlex.split(self.root_helper) + args

View File

@ -0,0 +1,91 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for linuxbridge rpc
"""
import stubout
import unittest2
from quantum.common import topics
from quantum.openstack.common import context
from quantum.openstack.common import rpc
from quantum.plugins.linuxbridge.agent import linuxbridge_quantum_agent as alb
from quantum.plugins.linuxbridge import lb_quantum_plugin as plb
class rpcApiTestCase(unittest2.TestCase):
def _test_lb_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION
if rpc_method == 'cast' and method == 'run_instance':
kwargs['call'] = False
self.fake_args = None
self.fake_kwargs = None
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if expected_retval:
return expected_retval
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
def test_delete_network(self):
rpcapi = plb.AgentNotifierApi(topics.AGENT)
self._test_lb_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.NETWORK,
topics.DELETE),
'network_delete', rpc_method='fanout_cast',
network_id='fake_request_spec')
def test_port_update(self):
rpcapi = plb.AgentNotifierApi(topics.AGENT)
self._test_lb_api(rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
'port_update', rpc_method='fanout_cast',
port='fake_port', vlan_id='fake_vlan_id')
def test_device_details(self):
rpcapi = alb.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')
def test_update_device_down(self):
rpcapi = alb.PluginApi(topics.PLUGIN)
self._test_lb_api(rpcapi, topics.PLUGIN,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id')

View File

@ -8,5 +8,6 @@ lxml
netaddr
python-gflags==1.3
python-quantumclient>=0.1,<0.2
pyudev
sqlalchemy>0.6.4
webob==1.2.0