Merge "Ensure timeout on queueing transaction"

This commit is contained in:
Zuul 2018-08-27 10:01:49 +00:00 committed by Gerrit Code Review
commit 1c58eacc81
3 changed files with 46 additions and 3 deletions

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import threading
import traceback
@ -21,12 +22,15 @@ from ovs import poller
from six.moves import queue as Queue
from ovsdbapp.backend.ovs_idl import idlutils
from ovsdbapp import exceptions
if os.name == 'nt':
from ovsdbapp.backend.ovs_idl.windows import connection_utils
else:
from ovsdbapp.backend.ovs_idl.linux import connection_utils
LOG = logging.getLogger(__name__)
class TransactionQueue(Queue.Queue, object):
def __init__(self, *args, **kwargs):
@ -85,6 +89,7 @@ class Connection(object):
self.thread.start()
def run(self):
errors = 0
while self._is_running:
self.idl.wait(self.poller)
self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
@ -92,7 +97,25 @@ class Connection(object):
# is solved.
self.poller.timer_wait(self.timeout * 1000)
self.poller.block()
self.idl.run()
# If we fail on a run() call, we could have missed an update
# from the server, leaving us out of sync with ovsdb-server.
# It is not safe to continue without restarting the connection,
# though it is likely that the error is unrecoverable, so only try
# a few times before bailing completely.
try:
self.idl.run()
except Exception as e:
# This shouldn't happen, but is possible if there is a bug
# in python-ovs
errors += 1
LOG.exception(e)
if errors <= 3:
self.idl.force_reconnect()
idlutils.wait_for_change(self.idl, self.timeout)
continue
self._is_running = False
break
errors = 0
txn = self.txns.get_nowait()
if txn is not None:
try:
@ -117,7 +140,11 @@ class Connection(object):
def queue_txn(self, txn):
# Even if we aren't started, we can queue a transaction and it will
# run when we are started
self.txns.put(txn)
try:
self.txns.put(txn, timeout=self.timeout)
except Queue.Full:
raise exceptions.TimeoutException(commands=txn.commands,
timeout=self.timeout)
class OvsdbIdl(idl.Idl):

View File

@ -40,6 +40,21 @@ class TestOvsdbIdl(base.FunctionalTestCase):
cleanup_cmd = self.api.del_br(self.brname)
self.addCleanup(cleanup_cmd.execute)
def test_idl_run_exception_terminates(self):
run = self.api.idl.run
with mock.patch.object(self.api.idl, "run") as runmock:
exceptions = iter([Exception("TestException")])
def side_effect():
try:
raise next(exceptions)
except StopIteration:
return run()
runmock.side_effect = side_effect
exists = self.api.br_exists(self.brname).execute(check_error=True)
self.assertFalse(exists)
def test_br_exists_false(self):
exists = self.api.br_exists(self.brname).execute(check_error=True)
self.assertFalse(exists)

View File

@ -49,7 +49,8 @@ class TestOVSNativeConnection(base.TestCase):
def test_queue_txn(self):
self.conn.start()
self.conn.queue_txn('blah')
self.conn.txns.put.assert_called_once_with('blah')
self.conn.txns.put.assert_called_once_with('blah',
timeout=self.conn.timeout)
class TestTransactionQueue(base.TestCase):