diff --git a/ovsdbapp/backend/ovs_idl/connection.py b/ovsdbapp/backend/ovs_idl/connection.py index fe59cf66..d1c3621f 100644 --- a/ovsdbapp/backend/ovs_idl/connection.py +++ b/ovsdbapp/backend/ovs_idl/connection.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import logging import os import threading import traceback @@ -21,12 +22,15 @@ from ovs import poller from six.moves import queue as Queue from ovsdbapp.backend.ovs_idl import idlutils +from ovsdbapp import exceptions if os.name == 'nt': from ovsdbapp.backend.ovs_idl.windows import connection_utils else: from ovsdbapp.backend.ovs_idl.linux import connection_utils +LOG = logging.getLogger(__name__) + class TransactionQueue(Queue.Queue, object): def __init__(self, *args, **kwargs): @@ -85,6 +89,7 @@ class Connection(object): self.thread.start() def run(self): + errors = 0 while self._is_running: self.idl.wait(self.poller) self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN) @@ -92,7 +97,25 @@ class Connection(object): # is solved. self.poller.timer_wait(self.timeout * 1000) self.poller.block() - self.idl.run() + # If we fail on a run() call, we could have missed an update + # from the server, leaving us out of sync with ovsdb-server. + # It is not safe to continue without restarting the connection, + # though it is likely that the error is unrecoverable, so only try + # a few times before bailing completely. + try: + self.idl.run() + except Exception as e: + # This shouldn't happen, but is possible if there is a bug + # in python-ovs + errors += 1 + LOG.exception(e) + if errors <= 3: + self.idl.force_reconnect() + idlutils.wait_for_change(self.idl, self.timeout) + continue + self._is_running = False + break + errors = 0 txn = self.txns.get_nowait() if txn is not None: try: @@ -117,7 +140,11 @@ class Connection(object): def queue_txn(self, txn): # Even if we aren't started, we can queue a transaction and it will # run when we are started - self.txns.put(txn) + try: + self.txns.put(txn, timeout=self.timeout) + except Queue.Full: + raise exceptions.TimeoutException(commands=txn.commands, + timeout=self.timeout) class OvsdbIdl(idl.Idl): diff --git a/ovsdbapp/tests/functional/schema/open_vswitch/test_impl_idl.py b/ovsdbapp/tests/functional/schema/open_vswitch/test_impl_idl.py index 75d3d1d7..7b9d6107 100644 --- a/ovsdbapp/tests/functional/schema/open_vswitch/test_impl_idl.py +++ b/ovsdbapp/tests/functional/schema/open_vswitch/test_impl_idl.py @@ -40,6 +40,21 @@ class TestOvsdbIdl(base.FunctionalTestCase): cleanup_cmd = self.api.del_br(self.brname) self.addCleanup(cleanup_cmd.execute) + def test_idl_run_exception_terminates(self): + run = self.api.idl.run + with mock.patch.object(self.api.idl, "run") as runmock: + exceptions = iter([Exception("TestException")]) + + def side_effect(): + try: + raise next(exceptions) + except StopIteration: + return run() + + runmock.side_effect = side_effect + exists = self.api.br_exists(self.brname).execute(check_error=True) + self.assertFalse(exists) + def test_br_exists_false(self): exists = self.api.br_exists(self.brname).execute(check_error=True) self.assertFalse(exists) diff --git a/ovsdbapp/tests/unit/backend/ovs_idl/test_connection.py b/ovsdbapp/tests/unit/backend/ovs_idl/test_connection.py index f02a8e66..987dee79 100644 --- a/ovsdbapp/tests/unit/backend/ovs_idl/test_connection.py +++ b/ovsdbapp/tests/unit/backend/ovs_idl/test_connection.py @@ -49,7 +49,8 @@ class TestOVSNativeConnection(base.TestCase): def test_queue_txn(self): self.conn.start() self.conn.queue_txn('blah') - self.conn.txns.put.assert_called_once_with('blah') + self.conn.txns.put.assert_called_once_with('blah', + timeout=self.conn.timeout) class TestTransactionQueue(base.TestCase):