Merge "Ensure timeout on queueing transaction"
This commit is contained in:
commit
1c58eacc81
|
@ -12,6 +12,7 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import threading
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
|
@ -21,12 +22,15 @@ from ovs import poller
|
||||||
from six.moves import queue as Queue
|
from six.moves import queue as Queue
|
||||||
|
|
||||||
from ovsdbapp.backend.ovs_idl import idlutils
|
from ovsdbapp.backend.ovs_idl import idlutils
|
||||||
|
from ovsdbapp import exceptions
|
||||||
|
|
||||||
if os.name == 'nt':
|
if os.name == 'nt':
|
||||||
from ovsdbapp.backend.ovs_idl.windows import connection_utils
|
from ovsdbapp.backend.ovs_idl.windows import connection_utils
|
||||||
else:
|
else:
|
||||||
from ovsdbapp.backend.ovs_idl.linux import connection_utils
|
from ovsdbapp.backend.ovs_idl.linux import connection_utils
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class TransactionQueue(Queue.Queue, object):
|
class TransactionQueue(Queue.Queue, object):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
@ -85,6 +89,7 @@ class Connection(object):
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
errors = 0
|
||||||
while self._is_running:
|
while self._is_running:
|
||||||
self.idl.wait(self.poller)
|
self.idl.wait(self.poller)
|
||||||
self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
|
self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
|
||||||
|
@ -92,7 +97,25 @@ class Connection(object):
|
||||||
# is solved.
|
# is solved.
|
||||||
self.poller.timer_wait(self.timeout * 1000)
|
self.poller.timer_wait(self.timeout * 1000)
|
||||||
self.poller.block()
|
self.poller.block()
|
||||||
self.idl.run()
|
# If we fail on a run() call, we could have missed an update
|
||||||
|
# from the server, leaving us out of sync with ovsdb-server.
|
||||||
|
# It is not safe to continue without restarting the connection,
|
||||||
|
# though it is likely that the error is unrecoverable, so only try
|
||||||
|
# a few times before bailing completely.
|
||||||
|
try:
|
||||||
|
self.idl.run()
|
||||||
|
except Exception as e:
|
||||||
|
# This shouldn't happen, but is possible if there is a bug
|
||||||
|
# in python-ovs
|
||||||
|
errors += 1
|
||||||
|
LOG.exception(e)
|
||||||
|
if errors <= 3:
|
||||||
|
self.idl.force_reconnect()
|
||||||
|
idlutils.wait_for_change(self.idl, self.timeout)
|
||||||
|
continue
|
||||||
|
self._is_running = False
|
||||||
|
break
|
||||||
|
errors = 0
|
||||||
txn = self.txns.get_nowait()
|
txn = self.txns.get_nowait()
|
||||||
if txn is not None:
|
if txn is not None:
|
||||||
try:
|
try:
|
||||||
|
@ -117,7 +140,11 @@ class Connection(object):
|
||||||
def queue_txn(self, txn):
|
def queue_txn(self, txn):
|
||||||
# Even if we aren't started, we can queue a transaction and it will
|
# Even if we aren't started, we can queue a transaction and it will
|
||||||
# run when we are started
|
# run when we are started
|
||||||
self.txns.put(txn)
|
try:
|
||||||
|
self.txns.put(txn, timeout=self.timeout)
|
||||||
|
except Queue.Full:
|
||||||
|
raise exceptions.TimeoutException(commands=txn.commands,
|
||||||
|
timeout=self.timeout)
|
||||||
|
|
||||||
|
|
||||||
class OvsdbIdl(idl.Idl):
|
class OvsdbIdl(idl.Idl):
|
||||||
|
|
|
@ -40,6 +40,21 @@ class TestOvsdbIdl(base.FunctionalTestCase):
|
||||||
cleanup_cmd = self.api.del_br(self.brname)
|
cleanup_cmd = self.api.del_br(self.brname)
|
||||||
self.addCleanup(cleanup_cmd.execute)
|
self.addCleanup(cleanup_cmd.execute)
|
||||||
|
|
||||||
|
def test_idl_run_exception_terminates(self):
|
||||||
|
run = self.api.idl.run
|
||||||
|
with mock.patch.object(self.api.idl, "run") as runmock:
|
||||||
|
exceptions = iter([Exception("TestException")])
|
||||||
|
|
||||||
|
def side_effect():
|
||||||
|
try:
|
||||||
|
raise next(exceptions)
|
||||||
|
except StopIteration:
|
||||||
|
return run()
|
||||||
|
|
||||||
|
runmock.side_effect = side_effect
|
||||||
|
exists = self.api.br_exists(self.brname).execute(check_error=True)
|
||||||
|
self.assertFalse(exists)
|
||||||
|
|
||||||
def test_br_exists_false(self):
|
def test_br_exists_false(self):
|
||||||
exists = self.api.br_exists(self.brname).execute(check_error=True)
|
exists = self.api.br_exists(self.brname).execute(check_error=True)
|
||||||
self.assertFalse(exists)
|
self.assertFalse(exists)
|
||||||
|
|
|
@ -49,7 +49,8 @@ class TestOVSNativeConnection(base.TestCase):
|
||||||
def test_queue_txn(self):
|
def test_queue_txn(self):
|
||||||
self.conn.start()
|
self.conn.start()
|
||||||
self.conn.queue_txn('blah')
|
self.conn.queue_txn('blah')
|
||||||
self.conn.txns.put.assert_called_once_with('blah')
|
self.conn.txns.put.assert_called_once_with('blah',
|
||||||
|
timeout=self.conn.timeout)
|
||||||
|
|
||||||
|
|
||||||
class TestTransactionQueue(base.TestCase):
|
class TestTransactionQueue(base.TestCase):
|
||||||
|
|
Loading…
Reference in New Issue