Ensure timeout on queueing transaction
If idl.run() somehow throws an Exception, the existing code would cause the Connection thread to die and then if more than one transaction was queued, it would hang forever. Instead, we enforce timeout on queing a transaction and log the exception that happens in the Connection thread to make debugging easier and try reconnecting on the off chance that the exception is recoverable. Change-Id: I14313df9a59424787230b3fbd630d925323969fe
This commit is contained in:
parent
f15d014fc7
commit
c3d639ab45
|
@ -12,6 +12,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import traceback
|
||||
|
@ -21,12 +22,15 @@ from ovs import poller
|
|||
from six.moves import queue as Queue
|
||||
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
from ovsdbapp import exceptions
|
||||
|
||||
if os.name == 'nt':
|
||||
from ovsdbapp.backend.ovs_idl.windows import connection_utils
|
||||
else:
|
||||
from ovsdbapp.backend.ovs_idl.linux import connection_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TransactionQueue(Queue.Queue, object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
@ -85,6 +89,7 @@ class Connection(object):
|
|||
self.thread.start()
|
||||
|
||||
def run(self):
|
||||
errors = 0
|
||||
while self._is_running:
|
||||
self.idl.wait(self.poller)
|
||||
self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
|
||||
|
@ -92,7 +97,25 @@ class Connection(object):
|
|||
# is solved.
|
||||
self.poller.timer_wait(self.timeout * 1000)
|
||||
self.poller.block()
|
||||
self.idl.run()
|
||||
# If we fail on a run() call, we could have missed an update
|
||||
# from the server, leaving us out of sync with ovsdb-server.
|
||||
# It is not safe to continue without restarting the connection,
|
||||
# though it is likely that the error is unrecoverable, so only try
|
||||
# a few times before bailing completely.
|
||||
try:
|
||||
self.idl.run()
|
||||
except Exception as e:
|
||||
# This shouldn't happen, but is possible if there is a bug
|
||||
# in python-ovs
|
||||
errors += 1
|
||||
LOG.exception(e)
|
||||
if errors <= 3:
|
||||
self.idl.force_reconnect()
|
||||
idlutils.wait_for_change(self.idl, self.timeout)
|
||||
continue
|
||||
self._is_running = False
|
||||
break
|
||||
errors = 0
|
||||
txn = self.txns.get_nowait()
|
||||
if txn is not None:
|
||||
try:
|
||||
|
@ -117,7 +140,11 @@ class Connection(object):
|
|||
def queue_txn(self, txn):
|
||||
# Even if we aren't started, we can queue a transaction and it will
|
||||
# run when we are started
|
||||
self.txns.put(txn)
|
||||
try:
|
||||
self.txns.put(txn, timeout=self.timeout)
|
||||
except Queue.Full:
|
||||
raise exceptions.TimeoutException(commands=txn.commands,
|
||||
timeout=self.timeout)
|
||||
|
||||
|
||||
class OvsdbIdl(idl.Idl):
|
||||
|
|
|
@ -40,6 +40,21 @@ class TestOvsdbIdl(base.FunctionalTestCase):
|
|||
cleanup_cmd = self.api.del_br(self.brname)
|
||||
self.addCleanup(cleanup_cmd.execute)
|
||||
|
||||
def test_idl_run_exception_terminates(self):
|
||||
run = self.api.idl.run
|
||||
with mock.patch.object(self.api.idl, "run") as runmock:
|
||||
exceptions = iter([Exception("TestException")])
|
||||
|
||||
def side_effect():
|
||||
try:
|
||||
raise next(exceptions)
|
||||
except StopIteration:
|
||||
return run()
|
||||
|
||||
runmock.side_effect = side_effect
|
||||
exists = self.api.br_exists(self.brname).execute(check_error=True)
|
||||
self.assertFalse(exists)
|
||||
|
||||
def test_br_exists_false(self):
|
||||
exists = self.api.br_exists(self.brname).execute(check_error=True)
|
||||
self.assertFalse(exists)
|
||||
|
|
|
@ -49,7 +49,8 @@ class TestOVSNativeConnection(base.TestCase):
|
|||
def test_queue_txn(self):
|
||||
self.conn.start()
|
||||
self.conn.queue_txn('blah')
|
||||
self.conn.txns.put.assert_called_once_with('blah')
|
||||
self.conn.txns.put.assert_called_once_with('blah',
|
||||
timeout=self.conn.timeout)
|
||||
|
||||
|
||||
class TestTransactionQueue(base.TestCase):
|
||||
|
|
Loading…
Reference in New Issue