Merge "Ensure timeout on queueing transaction"
This commit is contained in:
commit
1c58eacc81
|
@ -12,6 +12,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import traceback
|
||||
|
@ -21,12 +22,15 @@ from ovs import poller
|
|||
from six.moves import queue as Queue
|
||||
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
from ovsdbapp import exceptions
|
||||
|
||||
if os.name == 'nt':
|
||||
from ovsdbapp.backend.ovs_idl.windows import connection_utils
|
||||
else:
|
||||
from ovsdbapp.backend.ovs_idl.linux import connection_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TransactionQueue(Queue.Queue, object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
@ -85,6 +89,7 @@ class Connection(object):
|
|||
self.thread.start()
|
||||
|
||||
def run(self):
|
||||
errors = 0
|
||||
while self._is_running:
|
||||
self.idl.wait(self.poller)
|
||||
self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
|
||||
|
@ -92,7 +97,25 @@ class Connection(object):
|
|||
# is solved.
|
||||
self.poller.timer_wait(self.timeout * 1000)
|
||||
self.poller.block()
|
||||
self.idl.run()
|
||||
# If we fail on a run() call, we could have missed an update
|
||||
# from the server, leaving us out of sync with ovsdb-server.
|
||||
# It is not safe to continue without restarting the connection,
|
||||
# though it is likely that the error is unrecoverable, so only try
|
||||
# a few times before bailing completely.
|
||||
try:
|
||||
self.idl.run()
|
||||
except Exception as e:
|
||||
# This shouldn't happen, but is possible if there is a bug
|
||||
# in python-ovs
|
||||
errors += 1
|
||||
LOG.exception(e)
|
||||
if errors <= 3:
|
||||
self.idl.force_reconnect()
|
||||
idlutils.wait_for_change(self.idl, self.timeout)
|
||||
continue
|
||||
self._is_running = False
|
||||
break
|
||||
errors = 0
|
||||
txn = self.txns.get_nowait()
|
||||
if txn is not None:
|
||||
try:
|
||||
|
@ -117,7 +140,11 @@ class Connection(object):
|
|||
def queue_txn(self, txn):
|
||||
# Even if we aren't started, we can queue a transaction and it will
|
||||
# run when we are started
|
||||
self.txns.put(txn)
|
||||
try:
|
||||
self.txns.put(txn, timeout=self.timeout)
|
||||
except Queue.Full:
|
||||
raise exceptions.TimeoutException(commands=txn.commands,
|
||||
timeout=self.timeout)
|
||||
|
||||
|
||||
class OvsdbIdl(idl.Idl):
|
||||
|
|
|
@ -40,6 +40,21 @@ class TestOvsdbIdl(base.FunctionalTestCase):
|
|||
cleanup_cmd = self.api.del_br(self.brname)
|
||||
self.addCleanup(cleanup_cmd.execute)
|
||||
|
||||
def test_idl_run_exception_terminates(self):
|
||||
run = self.api.idl.run
|
||||
with mock.patch.object(self.api.idl, "run") as runmock:
|
||||
exceptions = iter([Exception("TestException")])
|
||||
|
||||
def side_effect():
|
||||
try:
|
||||
raise next(exceptions)
|
||||
except StopIteration:
|
||||
return run()
|
||||
|
||||
runmock.side_effect = side_effect
|
||||
exists = self.api.br_exists(self.brname).execute(check_error=True)
|
||||
self.assertFalse(exists)
|
||||
|
||||
def test_br_exists_false(self):
|
||||
exists = self.api.br_exists(self.brname).execute(check_error=True)
|
||||
self.assertFalse(exists)
|
||||
|
|
|
@ -49,7 +49,8 @@ class TestOVSNativeConnection(base.TestCase):
|
|||
def test_queue_txn(self):
|
||||
self.conn.start()
|
||||
self.conn.queue_txn('blah')
|
||||
self.conn.txns.put.assert_called_once_with('blah')
|
||||
self.conn.txns.put.assert_called_once_with('blah',
|
||||
timeout=self.conn.timeout)
|
||||
|
||||
|
||||
class TestTransactionQueue(base.TestCase):
|
||||
|
|
Loading…
Reference in New Issue