Process update_network in the openvswitch agent

This will allow ports with attributes related to the network to be
updated as necessary. Initially QoS extension which is able to
attach a network policy to the port.

Another approach would be sending updates to every single port
on a network, but that doesn't scale well for networks with lots
of ports.

Change-Id: Ie28297840b5702a920142af02dd17b10775d76ca
Partially-Implements: blueprint ml2-qos
Closes-Bug: 1486028
This commit is contained in:
Miguel Angel Ajo 2015-08-21 14:40:05 +02:00
parent 5282e80941
commit b292e197ff
4 changed files with 113 additions and 7 deletions

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import hashlib
import signal
import sys
@ -128,7 +129,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# 1.1 Support Security Group RPC
# 1.2 Support DVR (Distributed Virtual Router) RPC
# 1.3 Added param devices_to_update to security_groups_provider_updated
target = oslo_messaging.Target(version='1.3')
# 1.4 Added support for network_update
target = oslo_messaging.Target(version='1.4')
def __init__(self, bridge_classes, integ_br, tun_br, local_ip,
bridge_mappings, polling_interval, tunnel_types=None,
@ -226,6 +228,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.updated_ports = set()
# Stores port delete notifications
self.deleted_ports = set()
self.network_ports = collections.defaultdict(set)
# keeps association between ports and ofports to detect ofport change
self.vifname_to_ofport_map = {}
self.setup_rpc()
@ -361,7 +365,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
[constants.TUNNEL, topics.UPDATE],
[constants.TUNNEL, topics.DELETE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE]]
[topics.DVR, topics.UPDATE],
[topics.NETWORK, topics.UPDATE]]
if self.l2_pop:
consumers.append([topics.L2POPULATION,
topics.UPDATE, self.conf.host])
@ -394,8 +399,27 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def port_delete(self, context, **kwargs):
port_id = kwargs.get('port_id')
self.deleted_ports.add(port_id)
self.updated_ports.discard(port_id)
LOG.debug("port_delete message processed for port %s", port_id)
def network_update(self, context, **kwargs):
network_id = kwargs['network']['id']
for port_id in self.network_ports[network_id]:
# notifications could arrive out of order, if the port is deleted
# we don't want to update it anymore
if port_id not in self.deleted_ports:
self.updated_ports.add(port_id)
LOG.debug("network_update message processed for network "
"%(network_id)s, with ports: %(ports)s",
{'network_id': network_id,
'ports': self.network_ports[network_id]})
def _clean_network_ports(self, port_id):
for port_set in self.network_ports.values():
if port_id in port_set:
port_set.remove(port_id)
break
def process_deleted_ports(self, port_info):
# don't try to process removed ports as deleted ports since
# they are already gone
@ -407,6 +431,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# longer have access to the network
self.sg_agent.remove_devices_filter([port_id])
port = self.int_br.get_vif_port_by_id(port_id)
self._clean_network_ports(port_id)
self.ext_manager.delete_port(self.context,
{"vif_port": port,
"port_id": port_id})
@ -1297,7 +1322,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
has_sgs = 'security_groups' in details
if not port_security or not has_sgs:
security_disabled_devices.append(device)
self._update_port_network(details['port_id'],
details['network_id'])
self.ext_manager.handle_port(self.context, details)
else:
LOG.warn(_LW("Device %s not defined on plugin"), device)
@ -1305,6 +1331,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.port_dead(port)
return skipped_devices, need_binding_devices, security_disabled_devices
def _update_port_network(self, port_id, network_id):
self._clean_network_ports(port_id)
self.network_ports[network_id].add(port_id)
def treat_ancillary_devices_added(self, devices):
devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices(

View File

@ -665,6 +665,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self._process_l3_update(context, updated_network, net_data)
self.type_manager.extend_network_dict_provider(context,
updated_network)
# TODO(QoS): Move out to the extension framework somehow.
need_network_update_notify = (
qos_consts.QOS_POLICY_ID in net_data and
original_network[qos_consts.QOS_POLICY_ID] !=
updated_network[qos_consts.QOS_POLICY_ID])
mech_context = driver_context.NetworkContext(
self, context, updated_network,
original_network=original_network)
@ -675,6 +682,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# now the error is propogated to the caller, which is expected to
# either undo/retry the operation or delete the resource.
self.mechanism_manager.update_network_postcommit(mech_context)
if need_network_update_notify:
self.notifier.network_update(context, updated_network)
return updated_network
def get_network(self, context, id, fields=None):

View File

@ -279,7 +279,7 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
1.0 - Initial version.
1.1 - Added get_active_networks_info, create_dhcp_port,
update_dhcp_port, and removed get_dhcp_port methods.
1.4 - Added network_update
"""
def __init__(self, topic):
@ -293,6 +293,9 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
self.topic_port_delete = topics.get_topic_name(topic,
topics.PORT,
topics.DELETE)
self.topic_network_update = topics.get_topic_name(topic,
topics.NETWORK,
topics.UPDATE)
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
@ -314,3 +317,8 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
cctxt = self.client.prepare(topic=self.topic_port_delete,
fanout=True)
cctxt.cast(context, 'port_delete', port_id=port_id)
def network_update(self, context, network):
cctxt = self.client.prepare(topic=self.topic_network_update,
fanout=True, version='1.4')
cctxt.cast(context, 'network_update', network=network)

View File

@ -43,6 +43,13 @@ FAKE_MAC = '00:11:22:33:44:55'
FAKE_IP1 = '10.0.0.1'
FAKE_IP2 = '10.0.0.2'
TEST_PORT_ID1 = 'port-id-1'
TEST_PORT_ID2 = 'port-id-2'
TEST_PORT_ID3 = 'port-id-3'
TEST_NETWORK_ID1 = 'net-id-1'
TEST_NETWORK_ID2 = 'net-id-2'
class FakeVif(object):
ofport = 99
@ -629,15 +636,67 @@ class TestOvsNeutronAgent(object):
self.agent.agent_state, True)
def test_port_update(self):
port = {"id": "123",
"network_id": "124",
port = {"id": TEST_PORT_ID1,
"network_id": TEST_NETWORK_ID1,
"admin_state_up": False}
self.agent.port_update("unused_context",
port=port,
network_type="vlan",
segmentation_id="1",
physical_network="physnet")
self.assertEqual(set(['123']), self.agent.updated_ports)
self.assertEqual(set([TEST_PORT_ID1]), self.agent.updated_ports)
def test_port_delete_after_update(self):
"""Make sure a port is not marked for delete and update."""
port = {'id': TEST_PORT_ID1}
self.agent.port_update(context=None, port=port)
self.agent.port_delete(context=None, port_id=port['id'])
self.assertEqual(set(), self.agent.updated_ports)
self.assertEqual(set([port['id']]), self.agent.deleted_ports)
def test_process_deleted_ports_cleans_network_ports(self):
self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID1)
self.agent.port_delete(context=None, port_id=TEST_PORT_ID1)
self.agent.sg_agent = mock.Mock()
self.agent.int_br = mock.Mock()
self.agent.process_deleted_ports(port_info={})
self.assertEqual(set(), self.agent.network_ports[TEST_NETWORK_ID1])
def test_network_update(self):
"""Network update marks port for update. """
network = {'id': TEST_NETWORK_ID1}
port = {'id': TEST_PORT_ID1, 'network_id': network['id']}
self.agent._update_port_network(port['id'], port['network_id'])
self.agent.network_update(context=None, network=network)
self.assertEqual(set([port['id']]), self.agent.updated_ports)
def test_network_update_outoforder(self):
"""Network update arrives later than port_delete.
But the main agent loop still didn't process the ports,
so we ensure the port is not marked for update.
"""
network = {'id': TEST_NETWORK_ID1}
port = {'id': TEST_PORT_ID1, 'network_id': network['id']}
self.agent._update_port_network(port['id'], port['network_id'])
self.agent.port_delete(context=None, port_id=port['id'])
self.agent.network_update(context=None, network=network)
self.assertEqual(set(), self.agent.updated_ports)
def test_update_port_network(self):
"""Ensure ports are associated and moved across networks correctly."""
self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID1)
self.agent._update_port_network(TEST_PORT_ID2, TEST_NETWORK_ID1)
self.agent._update_port_network(TEST_PORT_ID3, TEST_NETWORK_ID2)
self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID2)
self.assertEqual(set([TEST_PORT_ID2]),
self.agent.network_ports[TEST_NETWORK_ID1])
self.assertEqual(set([TEST_PORT_ID1, TEST_PORT_ID3]),
self.agent.network_ports[TEST_NETWORK_ID2])
def test_port_delete(self):
vif = FakeVif()