Allow to stop and restart connections in Backend
Add a stop method to connections which effectively stops the thread running. A connection can also be restarted. Change-Id: Ifc74a834ca1a7d914c7ca66d71b47694f9eef1e5
This commit is contained in:
parent
7a89e9fffa
commit
a07950c24a
|
@ -42,6 +42,11 @@ class Backend(object):
|
|||
LOG.exception(connection_exception)
|
||||
raise connection_exception
|
||||
|
||||
@classmethod
|
||||
def restart_connection(cls):
|
||||
cls.ovsdb_connection.stop()
|
||||
cls.ovsdb_connection.start()
|
||||
|
||||
@property
|
||||
def idl(self):
|
||||
return self.__class__.ovsdb_connection.idl
|
||||
|
|
|
@ -21,6 +21,7 @@ from ovs import poller
|
|||
from six.moves import queue as Queue
|
||||
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
from ovsdbapp import exceptions
|
||||
|
||||
if os.name == 'nt':
|
||||
from ovsdbapp.backend.ovs_idl.windows import connection_utils
|
||||
|
@ -64,6 +65,7 @@ class Connection(object):
|
|||
self.lock = threading.Lock()
|
||||
self.idl = idl
|
||||
self.thread = None
|
||||
self._is_running = None
|
||||
|
||||
def start(self):
|
||||
"""Start the connection."""
|
||||
|
@ -78,12 +80,13 @@ class Connection(object):
|
|||
# An ovs.db.Idl class has no post_connect
|
||||
pass
|
||||
self.poller = poller.Poller()
|
||||
self._is_running = True
|
||||
self.thread = threading.Thread(target=self.run)
|
||||
self.thread.setDaemon(True)
|
||||
self.thread.start()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
while self._is_running:
|
||||
self.idl.wait(self.poller)
|
||||
self.poller.fd_wait(self.txns.alert_fileno, poller.POLLIN)
|
||||
# TODO(jlibosva): Remove next line once losing connection to ovsdb
|
||||
|
@ -101,7 +104,20 @@ class Connection(object):
|
|||
txn.results.put(er)
|
||||
self.txns.task_done()
|
||||
|
||||
def stop(self, timeout=None):
|
||||
if not self._is_running:
|
||||
return True
|
||||
self._is_running = False
|
||||
self.txns.put(None)
|
||||
self.thread.join(timeout)
|
||||
if self.thread.is_alive():
|
||||
return False
|
||||
self.thread = None
|
||||
return True
|
||||
|
||||
def queue_txn(self, txn):
|
||||
if not self._is_running:
|
||||
raise exceptions.NotConnectedError(txn=txn)
|
||||
self.txns.put(txn)
|
||||
|
||||
|
||||
|
|
|
@ -59,3 +59,7 @@ class OvsdbConnectionUnavailable(OvsdbAppException):
|
|||
"'%(error)s'. Verify that the OVS and OVN services are "
|
||||
"available and that the 'ovn_nb_connection' and "
|
||||
"'ovn_sb_connection' configuration options are correct.")
|
||||
|
||||
|
||||
class NotConnectedError(OvsdbAppException):
|
||||
message = "Cannot commit transaction %(txn)s. Not connected"
|
||||
|
|
|
@ -54,10 +54,12 @@ class FunctionalTestCase(base.TestCase):
|
|||
return
|
||||
cls._connections = {}
|
||||
for schema in cls.schemas:
|
||||
idl = connection.OvsdbIdl.from_server(cls.schema_map[schema],
|
||||
schema)
|
||||
cls._connections[schema] = connection.Connection(
|
||||
idl, constants.DEFAULT_TIMEOUT)
|
||||
cls._connections[schema] = cls.create_connection(schema)
|
||||
|
||||
@classmethod
|
||||
def create_connection(cls, schema):
|
||||
idl = connection.OvsdbIdl.from_server(cls.schema_map[schema], schema)
|
||||
return connection.Connection(idl, constants.DEFAULT_TIMEOUT)
|
||||
|
||||
def setUp(self):
|
||||
super(FunctionalTestCase, self).setUp()
|
||||
|
|
|
@ -101,6 +101,22 @@ class TestOvsdbIdl(base.FunctionalTestCase):
|
|||
cmd = self.api.del_port(utils.get_rand_device_name(), if_exists=False)
|
||||
self.assertRaises(RuntimeError, cmd.execute, check_error=True)
|
||||
|
||||
def test_connection_reconnect(self):
|
||||
self.api.ovsdb_connection.stop()
|
||||
existsCmd = self.api.br_exists(self.brname)
|
||||
self.assertRaises(exc.NotConnectedError,
|
||||
existsCmd.execute, check_error=True)
|
||||
self.api.ovsdb_connection.start()
|
||||
exists = self.api.br_exists(self.brname).execute(check_error=True)
|
||||
self.assertFalse(exists)
|
||||
|
||||
def test_connection_disconnect_timeout(self):
|
||||
_is_running_mock = mock.PropertyMock(return_value=True)
|
||||
connection = self.api.ovsdb_connection
|
||||
type(connection)._is_running = _is_running_mock
|
||||
self.addCleanup(delattr, type(connection), '_is_running')
|
||||
self.assertFalse(connection.stop(1))
|
||||
|
||||
|
||||
class ImplIdlTestCase(base.FunctionalTestCase):
|
||||
schemas = ['Open_vSwitch']
|
||||
|
|
|
@ -47,6 +47,7 @@ class TestOVSNativeConnection(base.TestCase):
|
|||
mock_thread.return_value.start.assert_called_once_with()
|
||||
|
||||
def test_queue_txn(self):
|
||||
self.conn.start()
|
||||
self.conn.queue_txn('blah')
|
||||
self.conn.txns.put.assert_called_once_with('blah')
|
||||
|
||||
|
|
Loading…
Reference in New Issue