Merge "Introduce ip address monitor"

This commit is contained in:
Jenkins 2015-03-05 01:14:11 +00:00 committed by Gerrit Code Review
commit 10b8b085f1
8 changed files with 210 additions and 18 deletions

View File

@ -31,6 +31,9 @@ kill_radvd: KillFilter, root, /sbin/radvd, -9, -HUP
ip: IpFilter, ip, root
ip_exec: IpNetnsExecFilter, ip, root
# For ip monitor
kill_ip_monitor: KillFilter, root, ip, -9
# ovs_lib (if OVSInterfaceDriver is used)
ovs-vsctl: CommandFilter, ovs-vsctl, root

View File

@ -87,10 +87,10 @@ class AsyncProcess(object):
return utils.pid_invoked_with_cmdline(
self.pid, self.cmd_without_namespace)
def start(self, blocking=False):
def start(self, block=False):
"""Launch a process and monitor it asynchronously.
:param blocking: Block until the process has started.
:param block: Block until the process has started.
:raises eventlet.timeout.Timeout if blocking is True and the process
did not start in time.
"""
@ -100,13 +100,13 @@ class AsyncProcess(object):
LOG.debug('Launching async process [%s].', self.cmd)
self._spawn()
if blocking:
if block:
utils.wait_until_true(self.is_active)
def stop(self, blocking=False):
def stop(self, block=False):
"""Halt the process and watcher threads.
:param blocking: Block until the process has stopped.
:param block: Block until the process has stopped.
:raises eventlet.timeout.Timeout if blocking is True and the process
did not stop in time.
"""
@ -116,7 +116,7 @@ class AsyncProcess(object):
else:
raise AsyncProcessException(_('Process is not running.'))
if blocking:
if block:
utils.wait_until_true(lambda: not self.is_active())
def _spawn(self):
@ -216,15 +216,15 @@ class AsyncProcess(object):
def _read_stderr(self):
return self._read(self._process.stderr, self._stderr_lines)
def _iter_queue(self, queue):
def _iter_queue(self, queue, block):
while True:
try:
yield queue.get_nowait()
yield queue.get(block=block)
except eventlet.queue.Empty:
break
def iter_stdout(self):
return self._iter_queue(self._stdout_lines)
def iter_stdout(self, block=False):
return self._iter_queue(self._stdout_lines, block)
def iter_stderr(self):
return self._iter_queue(self._stderr_lines)
def iter_stderr(self, block=False):
return self._iter_queue(self._stderr_lines, block)

View File

@ -0,0 +1,85 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import excutils
from neutron.agent.linux import async_process
from neutron.i18n import _LE
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class IPMonitorEvent(object):
def __init__(self, line, added, interface, cidr):
self.line = line
self.added = added
self.interface = interface
self.cidr = cidr
def __str__(self):
return self.line
@classmethod
def from_text(cls, line):
route = line.split()
try:
first_word = route[0]
except IndexError:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Unable to parse route "%s"'), line)
added = (first_word != 'Deleted')
if not added:
route = route[1:]
try:
interface = route[1]
cidr = route[3]
except IndexError:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Unable to parse route "%s"'), line)
return cls(line, added, interface, cidr)
class IPMonitor(async_process.AsyncProcess):
"""Wrapper over `ip monitor address`.
To monitor and react indefinitely:
m = IPMonitor(namespace='tmp')
m.start()
for iterable in m:
event = IPMonitorEvent.from_text(iterable)
print event, event.added, event.interface, event.cidr
"""
def __init__(self,
namespace=None,
respawn_interval=None):
super(IPMonitor, self).__init__(['ip', '-o', 'monitor', 'address'],
run_as_root=True,
respawn_interval=respawn_interval,
namespace=namespace)
def __iter__(self):
return self.iter_stdout(block=True)
def start(self):
super(IPMonitor, self).start(block=True)
def stop(self):
super(IPMonitor, self).stop(block=True)

View File

@ -50,9 +50,9 @@ class TestAsyncProcess(AsyncProcessTestFramework):
proc = async_process.AsyncProcess(['tail', '-f',
self.test_file_path])
self.addCleanup(self._safe_stop, proc)
proc.start(blocking=True)
proc.start(block=True)
self._check_stdout(proc)
proc.stop(blocking=True)
proc.stop(block=True)
# Ensure that the process and greenthreads have stopped
proc._process.wait()

View File

@ -0,0 +1,67 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import async_process
from neutron.agent.linux import ip_monitor
from neutron.tests.functional.agent.linux import test_ip_lib
class TestIPMonitor(test_ip_lib.IpLibTestFramework):
def setUp(self):
super(TestIPMonitor, self).setUp()
attr = self.generate_device_details()
self.device = self.manage_device(attr)
self.monitor = ip_monitor.IPMonitor(attr.namespace)
self.addCleanup(self._safe_stop_monitor)
def _safe_stop_monitor(self):
try:
self.monitor.stop()
except async_process.AsyncProcessException:
pass
def test_ip_monitor_lifecycle(self):
self.assertFalse(self.monitor.is_active())
self.monitor.start()
self.assertTrue(self.monitor.is_active())
self.monitor.stop()
self.assertFalse(self.monitor.is_active())
def test_ip_monitor_events(self):
self.monitor.start()
cidr = '169.254.128.1/24'
self.device.addr.add(4, cidr, '169.254.128.255')
self._assert_event(expected_name=self.device.name,
expected_cidr=cidr,
expected_added=True,
event=ip_monitor.IPMonitorEvent.from_text(
next(self.monitor.iter_stdout(block=True))))
self.device.addr.delete(4, cidr)
self._assert_event(expected_name=self.device.name,
expected_cidr=cidr,
expected_added=False,
event=ip_monitor.IPMonitorEvent.from_text(
next(self.monitor.iter_stdout(block=True))))
def _assert_event(self,
expected_name,
expected_cidr,
expected_added,
event):
self.assertEqual(expected_name, event.interface)
self.assertEqual(expected_added, event.added)
self.assertEqual(expected_cidr, event.cidr)

View File

@ -24,7 +24,7 @@ class TestPIDHelpers(test_async_process.AsyncProcessTestFramework):
def test_get_cmdline_from_pid_and_pid_invoked_with_cmdline(self):
cmd = ['tail', '-f', self.test_file_path]
proc = async_process.AsyncProcess(cmd)
proc.start(blocking=True)
proc.start(block=True)
self.addCleanup(proc.stop)
pid = proc.pid

View File

@ -128,13 +128,14 @@ class TestAsyncProcess(base.BaseTestCase):
mock_start.assert_called_once_with()
def test__iter_queue_returns_empty_list_for_empty_queue(self):
result = list(self.proc._iter_queue(eventlet.queue.LightQueue()))
result = list(self.proc._iter_queue(eventlet.queue.LightQueue(),
False))
self.assertEqual(result, [])
def test__iter_queue_returns_queued_data(self):
queue = eventlet.queue.LightQueue()
queue.put('foo')
result = list(self.proc._iter_queue(queue))
result = list(self.proc._iter_queue(queue, False))
self.assertEqual(result, ['foo'])
def _test_iter_output_calls_iter_queue_on_output_queue(self, output_type):
@ -146,7 +147,7 @@ class TestAsyncProcess(base.BaseTestCase):
self.assertEqual(value, expected_value)
queue = getattr(self.proc, '_%s_lines' % output_type, None)
mock_iter_queue.assert_called_with(queue)
mock_iter_queue.assert_called_with(queue, False)
def test_iter_stdout(self):
self._test_iter_output_calls_iter_queue_on_output_queue('stdout')

View File

@ -0,0 +1,36 @@
# Copyright 2015 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.agent.linux import ip_monitor
from neutron.tests import base
class TestIPMonitorEvent(base.BaseTestCase):
def test_from_text_parses_added_line(self):
event = ip_monitor.IPMonitorEvent.from_text(
'3: wlp3s0 inet 192.168.3.59/24 brd 192.168.3.255 '
'scope global dynamic wlp3s0\ valid_lft 300sec '
'preferred_lft 300sec')
self.assertEqual('wlp3s0', event.interface)
self.assertTrue(event.added)
self.assertEqual('192.168.3.59/24', event.cidr)
def test_from_text_parses_deleted_line(self):
event = ip_monitor.IPMonitorEvent.from_text(
'Deleted 1: lo inet 127.0.0.2/8 scope host secondary lo\''
' valid_lft forever preferred_lft forever')
self.assertEqual('lo', event.interface)
self.assertFalse(event.added)
self.assertEqual('127.0.0.2/8', event.cidr)