diff --git a/neutron/agent/common/ovs_lib.py b/neutron/agent/common/ovs_lib.py index 798c84f556f..1da2d22cf3d 100644 --- a/neutron/agent/common/ovs_lib.py +++ b/neutron/agent/common/ovs_lib.py @@ -120,10 +120,7 @@ class BaseOVS(object): self.ovsdb.add_br(bridge_name, datapath_type).execute() - br = OVSBridge(bridge_name) - # Don't return until vswitchd sets up the internal port - br.get_port_ofport(bridge_name) - return br + return OVSBridge(bridge_name) def delete_bridge(self, bridge_name): self.ovsdb.del_br(bridge_name).execute() @@ -221,8 +218,6 @@ class OVSBridge(BaseOVS): if secure_mode: txn.add(self.ovsdb.set_fail_mode(self.br_name, FAILMODE_SECURE)) - # Don't return until vswitchd sets up the internal port - self.get_port_ofport(self.br_name) def destroy(self): self.delete_bridge(self.br_name) @@ -248,8 +243,6 @@ class OVSBridge(BaseOVS): if interface_attr_tuples: txn.add(self.ovsdb.db_set('Interface', port_name, *interface_attr_tuples)) - # Don't return until the port has been assigned by vswitchd - self.get_port_ofport(port_name) def delete_port(self, port_name): self.ovsdb.del_port(port_name, self.br_name).execute() diff --git a/neutron/agent/ovsdb/impl_idl.py b/neutron/agent/ovsdb/impl_idl.py index 2aa5dda0a6e..9cc93a6fd4f 100644 --- a/neutron/agent/ovsdb/impl_idl.py +++ b/neutron/agent/ovsdb/impl_idl.py @@ -14,13 +14,14 @@ import time +from neutron_lib import exceptions from oslo_config import cfg from oslo_log import log as logging from oslo_utils import excutils from ovs.db import idl from six.moves import queue as Queue -from neutron._i18n import _ +from neutron._i18n import _, _LE from neutron.agent.ovsdb import api from neutron.agent.ovsdb.native import commands as cmd from neutron.agent.ovsdb.native import connection @@ -32,9 +33,13 @@ cfg.CONF.import_opt('ovs_vsctl_timeout', 'neutron.agent.common.ovs_lib') LOG = logging.getLogger(__name__) +class VswitchdInterfaceAddException(exceptions.NeutronException): + message = _("Failed to add interfaces: %(ifaces)s") + + class Transaction(api.Transaction): def __init__(self, api, ovsdb_connection, timeout, - check_error=False, log_errors=False): + check_error=False, log_errors=True): self.api = api self.check_error = check_error self.log_errors = log_errors @@ -42,6 +47,10 @@ class Transaction(api.Transaction): self.results = Queue.Queue(1) self.ovsdb_connection = ovsdb_connection self.timeout = timeout + self.expected_ifaces = set() + + def __str__(self): + return ", ".join(str(cmd) for cmd in self.commands) def add(self, command): """Add a command to the transaction @@ -61,23 +70,29 @@ class Transaction(api.Transaction): _("Commands %(commands)s exceeded timeout %(timeout)d " "seconds") % {'commands': self.commands, 'timeout': self.timeout}) - if self.check_error: - if isinstance(result, idlutils.ExceptionResult): - if self.log_errors: - LOG.error(result.tb) + if isinstance(result, idlutils.ExceptionResult): + if self.log_errors: + LOG.error(result.tb) + if self.check_error: raise result.ex return result + def pre_commit(self, txn): + pass + + def post_commit(self, txn): + pass + def do_commit(self): - start_time = time.time() + self.start_time = time.time() attempts = 0 while True: - elapsed_time = time.time() - start_time - if attempts > 0 and elapsed_time > self.timeout: + if attempts > 0 and self.timeout_exceeded(): raise RuntimeError("OVS transaction timed out") attempts += 1 # TODO(twilson) Make sure we don't loop longer than vsctl_timeout txn = idl.Transaction(self.api.idl) + self.pre_commit(txn) for i, command in enumerate(self.commands): LOG.debug("Running txn command(idx=%(idx)s): %(cmd)s", {'idx': i, 'cmd': command}) @@ -92,9 +107,8 @@ class Transaction(api.Transaction): status = txn.commit_block() if status == txn.TRY_AGAIN: LOG.debug("OVSDB transaction returned TRY_AGAIN, retrying") - idlutils.wait_for_change( - self.api.idl, self.timeout - elapsed_time, - seqno) + idlutils.wait_for_change(self.api.idl, self.time_remaining(), + seqno) continue elif status == txn.ERROR: msg = _("OVSDB Error: %s") % txn.get_error() @@ -109,9 +123,67 @@ class Transaction(api.Transaction): return elif status == txn.UNCHANGED: LOG.debug("Transaction caused no change") + elif status == txn.SUCCESS: + self.post_commit(txn) return [cmd.result for cmd in self.commands] + def elapsed_time(self): + return time.time() - self.start_time + + def time_remaining(self): + return self.timeout - self.elapsed_time() + + def timeout_exceeded(self): + return self.elapsed_time() > self.timeout + + +class NeutronOVSDBTransaction(Transaction): + def pre_commit(self, txn): + self.api._ovs.increment('next_cfg') + txn.expected_ifaces = set() + + def post_commit(self, txn): + # ovs-vsctl only logs these failures and does not return nonzero + try: + self.do_post_commit(txn) + except Exception: + LOG.exception(_LE("Post-commit checks failed")) + + def do_post_commit(self, txn): + next_cfg = txn.get_increment_new_value() + while not self.timeout_exceeded(): + self.api.idl.run() + if self.vswitchd_has_completed(next_cfg): + failed = self.post_commit_failed_interfaces(txn) + if failed: + raise VswitchdInterfaceAddException( + ifaces=", ".join(failed)) + break + self.ovsdb_connection.poller.timer_wait( + self.time_remaining() * 1000) + self.api.idl.wait(self.ovsdb_connection.poller) + self.ovsdb_connection.poller.block() + else: + raise api.TimeoutException( + _("Commands %(commands)s exceeded timeout %(timeout)d " + "seconds post-commit") % {'commands': self.commands, + 'timeout': self.timeout}) + + def post_commit_failed_interfaces(self, txn): + failed = [] + for iface_uuid in txn.expected_ifaces: + uuid = txn.get_insert_uuid(iface_uuid) + if uuid: + ifaces = self.api.idl.tables['Interface'] + iface = ifaces.rows.get(uuid) + if iface and (not iface.ofport or iface.ofport == -1): + failed.append(iface.name) + return failed + + def vswitchd_has_completed(self, next_cfg): + return self.api._ovs.cur_cfg >= next_cfg + class OvsdbIdl(api.API): @@ -133,9 +205,9 @@ class OvsdbIdl(api.API): return list(self._tables['Open_vSwitch'].rows.values())[0] def transaction(self, check_error=False, log_errors=True, **kwargs): - return Transaction(self, OvsdbIdl.ovsdb_connection, - self.context.vsctl_timeout, - check_error, log_errors) + return NeutronOVSDBTransaction(self, OvsdbIdl.ovsdb_connection, + self.context.vsctl_timeout, + check_error, log_errors) def add_br(self, name, may_exist=True, datapath_type=None): return cmd.AddBridgeCommand(self, name, may_exist, datapath_type) diff --git a/neutron/agent/ovsdb/native/commands.py b/neutron/agent/ovsdb/native/commands.py index 97649342fdc..ad49b7cd331 100644 --- a/neutron/agent/ovsdb/native/commands.py +++ b/neutron/agent/ovsdb/native/commands.py @@ -303,6 +303,8 @@ class AddPortCommand(BaseCommand): br.ports = ports iface = txn.insert(self.api._tables['Interface']) + # NOTE(twilson) The OVS lib's __getattr__ breaks iface.uuid here + txn.expected_ifaces.add(iface.__dict__['uuid']) iface.name = self.port port.verify('interfaces') ifaces = getattr(port, 'interfaces', []) diff --git a/neutron/tests/functional/agent/ovsdb/__init__.py b/neutron/tests/functional/agent/ovsdb/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/functional/agent/ovsdb/test_impl_idl.py b/neutron/tests/functional/agent/ovsdb/test_impl_idl.py new file mode 100644 index 00000000000..a48c1a45b59 --- /dev/null +++ b/neutron/tests/functional/agent/ovsdb/test_impl_idl.py @@ -0,0 +1,75 @@ +# Copyright (c) 2016 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron.agent.common import ovs_lib +from neutron.agent.ovsdb import api +from neutron.agent.ovsdb import impl_idl +from neutron.tests import base as test_base +from neutron.tests.common import net_helpers +from neutron.tests.functional import base + + +# NOTE(twilson) functools.partial does not work for this +def trpatch(*args, **kwargs): + def wrapped(fn): + return mock.patch.object(impl_idl.NeutronOVSDBTransaction, + *args, **kwargs)(fn) + return wrapped + + +class ImplIdlTestCase(base.BaseSudoTestCase): + def setUp(self): + super(ImplIdlTestCase, self).setUp() + self.config(group='OVS', ovsdb_interface='native') + self.ovs = ovs_lib.BaseOVS() + self.brname = test_base.get_rand_device_name(net_helpers.BR_PREFIX) + # Make sure exceptions pass through by calling do_post_commit directly + mock.patch.object( + impl_idl.NeutronOVSDBTransaction, "post_commit", + side_effect=impl_idl.NeutronOVSDBTransaction.do_post_commit, + autospec=True).start() + + def _add_br(self): + # NOTE(twilson) we will be raising exceptions with add_br, so schedule + # cleanup before that. + self.addCleanup(self.ovs.delete_bridge, self.brname) + ovsdb = self.ovs.ovsdb + with ovsdb.transaction(check_error=True) as tr: + tr.add(ovsdb.add_br(self.brname)) + return tr + + def _add_br_and_test(self): + self._add_br() + ofport = self.ovs.db_get_val("Interface", self.brname, "ofport") + self.assertTrue(int(ofport)) + self.assertTrue(ofport > -1) + + def test_post_commit_vswitchd_completed_no_failures(self): + self._add_br_and_test() + + @trpatch("vswitchd_has_completed", return_value=True) + @trpatch("post_commit_failed_interfaces", return_value=["failed_if1"]) + @trpatch("timeout_exceeded", return_value=False) + def test_post_commit_vswitchd_completed_failures(self, *args): + self.assertRaises(impl_idl.VswitchdInterfaceAddException, self._add_br) + + @trpatch("vswitchd_has_completed", return_value=False) + def test_post_commit_vswitchd_incomplete_timeout(self, *args): + # Due to timing issues we may rarely hit the global timeout, which + # raises RuntimeError to match the vsctl implementation + self.ovs.vsctl_timeout = 3 + self.assertRaises((api.TimeoutException, RuntimeError), self._add_br) diff --git a/neutron/tests/unit/agent/ovsdb/test_impl_idl.py b/neutron/tests/unit/agent/ovsdb/test_impl_idl.py index 3e4e927cbe9..31531a8afb4 100644 --- a/neutron/tests/unit/agent/ovsdb/test_impl_idl.py +++ b/neutron/tests/unit/agent/ovsdb/test_impl_idl.py @@ -31,7 +31,15 @@ from neutron.tests import base class TransactionTestCase(base.BaseTestCase): def test_commit_raises_exception_on_timeout(self): with mock.patch.object(queue, 'Queue') as mock_queue: - transaction = impl_idl.Transaction(mock.sentinel, mock.Mock(), 0) + transaction = impl_idl.NeutronOVSDBTransaction(mock.sentinel, + mock.Mock(), 0) mock_queue.return_value.get.side_effect = queue.Empty with testtools.ExpectedException(api.TimeoutException): transaction.commit() + + def test_post_commit_does_not_raise_exception(self): + with mock.patch.object(impl_idl.NeutronOVSDBTransaction, + "do_post_commit", side_effect=Exception): + transaction = impl_idl.NeutronOVSDBTransaction(mock.sentinel, + mock.Mock(), 0) + transaction.post_commit(mock.Mock())