Process update_network in the openvswitch agent
This will allow ports with attributes related to the network to be updated as necessary. Initially QoS extension which is able to attach a network policy to the port. Another approach would be sending updates to every single port on a network, but that doesn't scale well for networks with lots of ports. Change-Id: Ie28297840b5702a920142af02dd17b10775d76ca Partially-Implements: blueprint ml2-qos Closes-Bug: 1486028
This commit is contained in:
parent
5282e80941
commit
b292e197ff
|
@ -13,6 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import hashlib
|
||||
import signal
|
||||
import sys
|
||||
|
@ -128,7 +129,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
# 1.1 Support Security Group RPC
|
||||
# 1.2 Support DVR (Distributed Virtual Router) RPC
|
||||
# 1.3 Added param devices_to_update to security_groups_provider_updated
|
||||
target = oslo_messaging.Target(version='1.3')
|
||||
# 1.4 Added support for network_update
|
||||
target = oslo_messaging.Target(version='1.4')
|
||||
|
||||
def __init__(self, bridge_classes, integ_br, tun_br, local_ip,
|
||||
bridge_mappings, polling_interval, tunnel_types=None,
|
||||
|
@ -226,6 +228,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
self.updated_ports = set()
|
||||
# Stores port delete notifications
|
||||
self.deleted_ports = set()
|
||||
|
||||
self.network_ports = collections.defaultdict(set)
|
||||
# keeps association between ports and ofports to detect ofport change
|
||||
self.vifname_to_ofport_map = {}
|
||||
self.setup_rpc()
|
||||
|
@ -361,7 +365,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
[constants.TUNNEL, topics.UPDATE],
|
||||
[constants.TUNNEL, topics.DELETE],
|
||||
[topics.SECURITY_GROUP, topics.UPDATE],
|
||||
[topics.DVR, topics.UPDATE]]
|
||||
[topics.DVR, topics.UPDATE],
|
||||
[topics.NETWORK, topics.UPDATE]]
|
||||
if self.l2_pop:
|
||||
consumers.append([topics.L2POPULATION,
|
||||
topics.UPDATE, self.conf.host])
|
||||
|
@ -394,8 +399,27 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
def port_delete(self, context, **kwargs):
|
||||
port_id = kwargs.get('port_id')
|
||||
self.deleted_ports.add(port_id)
|
||||
self.updated_ports.discard(port_id)
|
||||
LOG.debug("port_delete message processed for port %s", port_id)
|
||||
|
||||
def network_update(self, context, **kwargs):
|
||||
network_id = kwargs['network']['id']
|
||||
for port_id in self.network_ports[network_id]:
|
||||
# notifications could arrive out of order, if the port is deleted
|
||||
# we don't want to update it anymore
|
||||
if port_id not in self.deleted_ports:
|
||||
self.updated_ports.add(port_id)
|
||||
LOG.debug("network_update message processed for network "
|
||||
"%(network_id)s, with ports: %(ports)s",
|
||||
{'network_id': network_id,
|
||||
'ports': self.network_ports[network_id]})
|
||||
|
||||
def _clean_network_ports(self, port_id):
|
||||
for port_set in self.network_ports.values():
|
||||
if port_id in port_set:
|
||||
port_set.remove(port_id)
|
||||
break
|
||||
|
||||
def process_deleted_ports(self, port_info):
|
||||
# don't try to process removed ports as deleted ports since
|
||||
# they are already gone
|
||||
|
@ -407,6 +431,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
# longer have access to the network
|
||||
self.sg_agent.remove_devices_filter([port_id])
|
||||
port = self.int_br.get_vif_port_by_id(port_id)
|
||||
self._clean_network_ports(port_id)
|
||||
self.ext_manager.delete_port(self.context,
|
||||
{"vif_port": port,
|
||||
"port_id": port_id})
|
||||
|
@ -1297,7 +1322,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
has_sgs = 'security_groups' in details
|
||||
if not port_security or not has_sgs:
|
||||
security_disabled_devices.append(device)
|
||||
|
||||
self._update_port_network(details['port_id'],
|
||||
details['network_id'])
|
||||
self.ext_manager.handle_port(self.context, details)
|
||||
else:
|
||||
LOG.warn(_LW("Device %s not defined on plugin"), device)
|
||||
|
@ -1305,6 +1331,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||
self.port_dead(port)
|
||||
return skipped_devices, need_binding_devices, security_disabled_devices
|
||||
|
||||
def _update_port_network(self, port_id, network_id):
|
||||
self._clean_network_ports(port_id)
|
||||
self.network_ports[network_id].add(port_id)
|
||||
|
||||
def treat_ancillary_devices_added(self, devices):
|
||||
devices_details_list = (
|
||||
self.plugin_rpc.get_devices_details_list_and_failed_devices(
|
||||
|
|
|
@ -665,6 +665,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self._process_l3_update(context, updated_network, net_data)
|
||||
self.type_manager.extend_network_dict_provider(context,
|
||||
updated_network)
|
||||
|
||||
# TODO(QoS): Move out to the extension framework somehow.
|
||||
need_network_update_notify = (
|
||||
qos_consts.QOS_POLICY_ID in net_data and
|
||||
original_network[qos_consts.QOS_POLICY_ID] !=
|
||||
updated_network[qos_consts.QOS_POLICY_ID])
|
||||
|
||||
mech_context = driver_context.NetworkContext(
|
||||
self, context, updated_network,
|
||||
original_network=original_network)
|
||||
|
@ -675,6 +682,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
# now the error is propogated to the caller, which is expected to
|
||||
# either undo/retry the operation or delete the resource.
|
||||
self.mechanism_manager.update_network_postcommit(mech_context)
|
||||
if need_network_update_notify:
|
||||
self.notifier.network_update(context, updated_network)
|
||||
return updated_network
|
||||
|
||||
def get_network(self, context, id, fields=None):
|
||||
|
|
|
@ -279,7 +279,7 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
|
|||
1.0 - Initial version.
|
||||
1.1 - Added get_active_networks_info, create_dhcp_port,
|
||||
update_dhcp_port, and removed get_dhcp_port methods.
|
||||
|
||||
1.4 - Added network_update
|
||||
"""
|
||||
|
||||
def __init__(self, topic):
|
||||
|
@ -293,6 +293,9 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
|
|||
self.topic_port_delete = topics.get_topic_name(topic,
|
||||
topics.PORT,
|
||||
topics.DELETE)
|
||||
self.topic_network_update = topics.get_topic_name(topic,
|
||||
topics.NETWORK,
|
||||
topics.UPDATE)
|
||||
|
||||
target = oslo_messaging.Target(topic=topic, version='1.0')
|
||||
self.client = n_rpc.get_client(target)
|
||||
|
@ -314,3 +317,8 @@ class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin,
|
|||
cctxt = self.client.prepare(topic=self.topic_port_delete,
|
||||
fanout=True)
|
||||
cctxt.cast(context, 'port_delete', port_id=port_id)
|
||||
|
||||
def network_update(self, context, network):
|
||||
cctxt = self.client.prepare(topic=self.topic_network_update,
|
||||
fanout=True, version='1.4')
|
||||
cctxt.cast(context, 'network_update', network=network)
|
||||
|
|
|
@ -43,6 +43,13 @@ FAKE_MAC = '00:11:22:33:44:55'
|
|||
FAKE_IP1 = '10.0.0.1'
|
||||
FAKE_IP2 = '10.0.0.2'
|
||||
|
||||
TEST_PORT_ID1 = 'port-id-1'
|
||||
TEST_PORT_ID2 = 'port-id-2'
|
||||
TEST_PORT_ID3 = 'port-id-3'
|
||||
|
||||
TEST_NETWORK_ID1 = 'net-id-1'
|
||||
TEST_NETWORK_ID2 = 'net-id-2'
|
||||
|
||||
|
||||
class FakeVif(object):
|
||||
ofport = 99
|
||||
|
@ -629,15 +636,67 @@ class TestOvsNeutronAgent(object):
|
|||
self.agent.agent_state, True)
|
||||
|
||||
def test_port_update(self):
|
||||
port = {"id": "123",
|
||||
"network_id": "124",
|
||||
port = {"id": TEST_PORT_ID1,
|
||||
"network_id": TEST_NETWORK_ID1,
|
||||
"admin_state_up": False}
|
||||
self.agent.port_update("unused_context",
|
||||
port=port,
|
||||
network_type="vlan",
|
||||
segmentation_id="1",
|
||||
physical_network="physnet")
|
||||
self.assertEqual(set(['123']), self.agent.updated_ports)
|
||||
self.assertEqual(set([TEST_PORT_ID1]), self.agent.updated_ports)
|
||||
|
||||
def test_port_delete_after_update(self):
|
||||
"""Make sure a port is not marked for delete and update."""
|
||||
port = {'id': TEST_PORT_ID1}
|
||||
|
||||
self.agent.port_update(context=None, port=port)
|
||||
self.agent.port_delete(context=None, port_id=port['id'])
|
||||
self.assertEqual(set(), self.agent.updated_ports)
|
||||
self.assertEqual(set([port['id']]), self.agent.deleted_ports)
|
||||
|
||||
def test_process_deleted_ports_cleans_network_ports(self):
|
||||
self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID1)
|
||||
self.agent.port_delete(context=None, port_id=TEST_PORT_ID1)
|
||||
self.agent.sg_agent = mock.Mock()
|
||||
self.agent.int_br = mock.Mock()
|
||||
self.agent.process_deleted_ports(port_info={})
|
||||
self.assertEqual(set(), self.agent.network_ports[TEST_NETWORK_ID1])
|
||||
|
||||
def test_network_update(self):
|
||||
"""Network update marks port for update. """
|
||||
network = {'id': TEST_NETWORK_ID1}
|
||||
port = {'id': TEST_PORT_ID1, 'network_id': network['id']}
|
||||
|
||||
self.agent._update_port_network(port['id'], port['network_id'])
|
||||
self.agent.network_update(context=None, network=network)
|
||||
self.assertEqual(set([port['id']]), self.agent.updated_ports)
|
||||
|
||||
def test_network_update_outoforder(self):
|
||||
"""Network update arrives later than port_delete.
|
||||
|
||||
But the main agent loop still didn't process the ports,
|
||||
so we ensure the port is not marked for update.
|
||||
"""
|
||||
network = {'id': TEST_NETWORK_ID1}
|
||||
port = {'id': TEST_PORT_ID1, 'network_id': network['id']}
|
||||
|
||||
self.agent._update_port_network(port['id'], port['network_id'])
|
||||
self.agent.port_delete(context=None, port_id=port['id'])
|
||||
self.agent.network_update(context=None, network=network)
|
||||
self.assertEqual(set(), self.agent.updated_ports)
|
||||
|
||||
def test_update_port_network(self):
|
||||
"""Ensure ports are associated and moved across networks correctly."""
|
||||
self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID1)
|
||||
self.agent._update_port_network(TEST_PORT_ID2, TEST_NETWORK_ID1)
|
||||
self.agent._update_port_network(TEST_PORT_ID3, TEST_NETWORK_ID2)
|
||||
self.agent._update_port_network(TEST_PORT_ID1, TEST_NETWORK_ID2)
|
||||
|
||||
self.assertEqual(set([TEST_PORT_ID2]),
|
||||
self.agent.network_ports[TEST_NETWORK_ID1])
|
||||
self.assertEqual(set([TEST_PORT_ID1, TEST_PORT_ID3]),
|
||||
self.agent.network_ports[TEST_NETWORK_ID2])
|
||||
|
||||
def test_port_delete(self):
|
||||
vif = FakeVif()
|
||||
|
|
Loading…
Reference in New Issue