Add get_events to OVSDB monitor
OVSDB monitor can generate the events that the OVS agent needs to process (device added or updated). Instead of notifying only that a change occurred and that polling is needed, pass the events to the agent Change-Id: I3d17bf995ad4508c4c6d089de550148da1465fa1 Partially-Implements: blueprint restructure-l2-agent
This commit is contained in:
parent
b06b4cafc5
commit
725543684c
|
@ -14,13 +14,19 @@
|
|||
|
||||
import eventlet
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from neutron.agent.linux import async_process
|
||||
from neutron.agent.ovsdb import api as ovsdb
|
||||
from neutron.i18n import _LE
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
OVSDB_ACTION_INITIAL = 'initial'
|
||||
OVSDB_ACTION_INSERT = 'insert'
|
||||
OVSDB_ACTION_DELETE = 'delete'
|
||||
|
||||
|
||||
class OvsdbMonitor(async_process.AsyncProcess):
|
||||
"""Manages an invocation of 'ovsdb-client monitor'."""
|
||||
|
@ -63,22 +69,50 @@ class SimpleInterfaceMonitor(OvsdbMonitor):
|
|||
def __init__(self, respawn_interval=None):
|
||||
super(SimpleInterfaceMonitor, self).__init__(
|
||||
'Interface',
|
||||
columns=['name', 'ofport'],
|
||||
columns=['name', 'ofport', 'external_ids'],
|
||||
format='json',
|
||||
respawn_interval=respawn_interval,
|
||||
)
|
||||
self.data_received = False
|
||||
self.new_events = {'added': [], 'removed': []}
|
||||
|
||||
@property
|
||||
def has_updates(self):
|
||||
"""Indicate whether the ovsdb Interface table has been updated.
|
||||
|
||||
True will be returned if the monitor process is not active.
|
||||
This 'failing open' minimizes the risk of falsely indicating
|
||||
the absence of updates at the expense of potential false
|
||||
positives.
|
||||
If the monitor process is not active an error will be logged since
|
||||
it won't be able to communicate any update. This situation should be
|
||||
temporary if respawn_interval is set.
|
||||
"""
|
||||
return bool(list(self.iter_stdout())) or not self.is_active()
|
||||
if not self.is_active():
|
||||
LOG.error(_LE("Interface monitor is not active"))
|
||||
else:
|
||||
self.process_events()
|
||||
return bool(self.new_events['added'] or self.new_events['removed'])
|
||||
|
||||
def get_events(self):
|
||||
self.process_events()
|
||||
events = self.new_events
|
||||
self.new_events = {'added': [], 'removed': []}
|
||||
return events
|
||||
|
||||
def process_events(self):
|
||||
devices_added = []
|
||||
devices_removed = []
|
||||
for row in self.iter_stdout():
|
||||
json = jsonutils.loads(row).get('data')
|
||||
for ovs_id, action, name, ofport, external_ids in json:
|
||||
if external_ids:
|
||||
external_ids = ovsdb.val_to_py(external_ids)
|
||||
device = {'name': name,
|
||||
'ofport': ofport,
|
||||
'external_ids': external_ids}
|
||||
if action in (OVSDB_ACTION_INITIAL, OVSDB_ACTION_INSERT):
|
||||
devices_added.append(device)
|
||||
elif action == OVSDB_ACTION_DELETE:
|
||||
devices_removed.append(device)
|
||||
self.new_events['added'].extend(devices_added)
|
||||
self.new_events['removed'].extend(devices_removed)
|
||||
|
||||
def start(self, block=False, timeout=5):
|
||||
super(SimpleInterfaceMonitor, self).start()
|
||||
|
|
|
@ -60,3 +60,6 @@ class InterfacePollingMinimizer(base_polling.BasePollingManager):
|
|||
# collect output.
|
||||
eventlet.sleep()
|
||||
return self._monitor.has_updates
|
||||
|
||||
def get_events(self):
|
||||
return self._monitor.get_events()
|
||||
|
|
|
@ -181,6 +181,12 @@ class OVSBridgeFixture(fixtures.Fixture):
|
|||
|
||||
class OVSPortFixture(PortFixture):
|
||||
|
||||
def __init__(self, bridge=None, namespace=None, attrs=None):
|
||||
super(OVSPortFixture, self).__init__(bridge, namespace)
|
||||
if attrs is None:
|
||||
attrs = []
|
||||
self.attrs = attrs
|
||||
|
||||
def _create_bridge_fixture(self):
|
||||
return OVSBridgeFixture()
|
||||
|
||||
|
@ -196,7 +202,8 @@ class OVSPortFixture(PortFixture):
|
|||
self.port.link.set_up()
|
||||
|
||||
def create_port(self, name):
|
||||
self.bridge.add_port(name, ('type', 'internal'))
|
||||
self.attrs.insert(0, ('type', 'internal'))
|
||||
self.bridge.add_port(name, *self.attrs)
|
||||
return name
|
||||
|
||||
|
||||
|
|
|
@ -107,9 +107,51 @@ class TestSimpleInterfaceMonitor(BaseMonitorTest):
|
|||
utils.wait_until_true(lambda: self.monitor.data_received is True)
|
||||
self.assertTrue(self.monitor.has_updates,
|
||||
'Initial call should always be true')
|
||||
self.assertFalse(self.monitor.has_updates,
|
||||
'has_updates without port addition should be False')
|
||||
# clear the event list
|
||||
self.monitor.get_events()
|
||||
self.useFixture(net_helpers.OVSPortFixture())
|
||||
# has_updates after port addition should become True
|
||||
while not self.monitor.has_updates:
|
||||
eventlet.sleep(0.01)
|
||||
utils.wait_until_true(lambda: self.monitor.has_updates is True)
|
||||
|
||||
def _expected_devices_events(self, devices, state):
|
||||
"""Helper to check that events are received for expected devices.
|
||||
|
||||
:param devices: The list of expected devices. WARNING: This list
|
||||
is modified by this method
|
||||
:param state: The state of the devices (added or removed)
|
||||
"""
|
||||
events = self.monitor.get_events()
|
||||
event_devices = [
|
||||
(dev['name'], dev['external_ids']) for dev in events.get(state)]
|
||||
for dev in event_devices:
|
||||
if dev[0] in devices:
|
||||
devices.remove(dev[0])
|
||||
self.assertEqual(dev[1].get('iface-status'), 'active')
|
||||
if not devices:
|
||||
return True
|
||||
|
||||
def test_get_events(self):
|
||||
utils.wait_until_true(lambda: self.monitor.data_received is True)
|
||||
devices = self.monitor.get_events()
|
||||
self.assertTrue(devices.get('added'),
|
||||
'Initial call should always be true')
|
||||
p_attrs = [('external_ids', {'iface-status': 'active'})]
|
||||
br = self.useFixture(net_helpers.OVSBridgeFixture())
|
||||
p1 = self.useFixture(net_helpers.OVSPortFixture(
|
||||
br.bridge, None, p_attrs))
|
||||
p2 = self.useFixture(net_helpers.OVSPortFixture(
|
||||
br.bridge, None, p_attrs))
|
||||
added_devices = [p1.port.name, p2.port.name]
|
||||
utils.wait_until_true(
|
||||
lambda: self._expected_devices_events(added_devices, 'added'))
|
||||
br.bridge.delete_port(p1.port.name)
|
||||
br.bridge.delete_port(p2.port.name)
|
||||
removed_devices = [p1.port.name, p2.port.name]
|
||||
utils.wait_until_true(
|
||||
lambda: self._expected_devices_events(removed_devices, 'removed'))
|
||||
# restart
|
||||
self.monitor.stop(block=True)
|
||||
self.monitor.start(block=True, timeout=60)
|
||||
devices = self.monitor.get_events()
|
||||
self.assertTrue(devices.get('added'),
|
||||
'Initial call should always be true')
|
||||
|
|
|
@ -55,9 +55,6 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase):
|
|||
super(TestSimpleInterfaceMonitor, self).setUp()
|
||||
self.monitor = ovsdb_monitor.SimpleInterfaceMonitor()
|
||||
|
||||
def test_has_updates_is_true_by_default(self):
|
||||
self.assertTrue(self.monitor.has_updates)
|
||||
|
||||
def test_has_updates_is_false_if_active_with_no_output(self):
|
||||
target = ('neutron.agent.linux.ovsdb_monitor.SimpleInterfaceMonitor'
|
||||
'.is_active')
|
||||
|
@ -87,3 +84,12 @@ class TestSimpleInterfaceMonitor(base.BaseTestCase):
|
|||
return_value=output):
|
||||
self.monitor._read_stdout()
|
||||
self.assertFalse(self.monitor.data_received)
|
||||
|
||||
def test_has_updates_after_calling_get_events_is_false(self):
|
||||
with mock.patch.object(
|
||||
self.monitor, 'process_events') as process_events:
|
||||
self.monitor.new_events = {'added': ['foo'], 'removed': ['foo1']}
|
||||
self.assertTrue(self.monitor.has_updates)
|
||||
self.monitor.get_events()
|
||||
self.assertTrue(process_events.called)
|
||||
self.assertFalse(self.monitor.has_updates)
|
||||
|
|
Loading…
Reference in New Issue