Ensure timeout on queueing transaction

If idl.run() somehow throws an Exception, the existing code
would cause the Connection thread to die and then if more than
one transaction was queued, it would hang forever. Instead, we
enforce timeout on queing a transaction and log the exception
that happens in the Connection thread to make debugging easier
and try reconnecting on the off chance that the exception is
recoverable.

Change-Id: I14313df9a59424787230b3fbd630d925323969fe
This commit is contained in:
Terry Wilson 2018-04-26 15:44:48 -05:00
parent f15d014fc7
commit c3d639ab45
3 changed files with 46 additions and 3 deletions

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import threading
import traceback
@ -21,12 +22,15 @@ from ovs import poller
from six.moves import queue as Queue
from ovsdbapp.backend.ovs_idl import idlutils
from ovsdbapp import exceptions
if os.name == 'nt':
from ovsdbapp.backend.ovs_idl.windows import connection_utils
else:
from ovsdbapp.backend.ovs_idl.linux import connection_utils
LOG = logging.getLogger(__name__)
class TransactionQueue(Queue.Queue, object):
def __init__(self, *args, **kwargs):
@ -85,6 +89,7 @@ class Connection(object):
self.thread.start()
def run(self):
errors = 0
while self._is_running:
self.idl.wait(self.poller)
self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
@ -92,7 +97,25 @@ class Connection(object):
# is solved.
self.poller.timer_wait(self.timeout * 1000)
self.poller.block()
self.idl.run()
# If we fail on a run() call, we could have missed an update
# from the server, leaving us out of sync with ovsdb-server.
# It is not safe to continue without restarting the connection,
# though it is likely that the error is unrecoverable, so only try
# a few times before bailing completely.
try:
self.idl.run()
except Exception as e:
# This shouldn't happen, but is possible if there is a bug
# in python-ovs
errors += 1
LOG.exception(e)
if errors <= 3:
self.idl.force_reconnect()
idlutils.wait_for_change(self.idl, self.timeout)
continue
self._is_running = False
break
errors = 0
txn = self.txns.get_nowait()
if txn is not None:
try:
@ -117,7 +140,11 @@ class Connection(object):
def queue_txn(self, txn):
# Even if we aren't started, we can queue a transaction and it will
# run when we are started
self.txns.put(txn)
try:
self.txns.put(txn, timeout=self.timeout)
except Queue.Full:
raise exceptions.TimeoutException(commands=txn.commands,
timeout=self.timeout)
class OvsdbIdl(idl.Idl):

View File

@ -40,6 +40,21 @@ class TestOvsdbIdl(base.FunctionalTestCase):
cleanup_cmd = self.api.del_br(self.brname)
self.addCleanup(cleanup_cmd.execute)
def test_idl_run_exception_terminates(self):
run = self.api.idl.run
with mock.patch.object(self.api.idl, "run") as runmock:
exceptions = iter([Exception("TestException")])
def side_effect():
try:
raise next(exceptions)
except StopIteration:
return run()
runmock.side_effect = side_effect
exists = self.api.br_exists(self.brname).execute(check_error=True)
self.assertFalse(exists)
def test_br_exists_false(self):
exists = self.api.br_exists(self.brname).execute(check_error=True)
self.assertFalse(exists)

View File

@ -49,7 +49,8 @@ class TestOVSNativeConnection(base.TestCase):
def test_queue_txn(self):
self.conn.start()
self.conn.queue_txn('blah')
self.conn.txns.put.assert_called_once_with('blah')
self.conn.txns.put.assert_called_once_with('blah',
timeout=self.conn.timeout)
class TestTransactionQueue(base.TestCase):