Don't create an OVSDB connection per API request

Using the default ovsdbapp.backend.ovs_idl.Backend behavior which
stores the ovsdb_connection on the Backend class itself ensures
that when Octavia does the imports necessary for instantiating the
driver, that the ovsdb_connection will remain on the class even
though the instances are deleted. This will allow us to reuse the
OVSDB connection between APIs despite octavia-api knowing nothing
about it.

In addition, __del__() can called from any thread by the garbage
collector, and so it was possible that thread.join() would be
called by the thread we were in, throwing an Exception and
preventing cleanup.

Co-authored-by: Gregory Thiemonge <gthiemon@redhat.com>
Co-authored-by: Ihar Hrachyshka <ihrachys@redhat.com>
Co-authored-by: Jakub Libosvar <jlibosva@redhat.com>

Closes-Bug: #2065460
Closes-Bug: #2065459

Change-Id: I4f5df34ab32e8b33ff54f311b9b2ac00c9be1e05
This commit is contained in:
Terry Wilson 2024-05-10 15:00:05 -05:00
parent b0bfda27d0
commit 9c2274e813
10 changed files with 45 additions and 108 deletions

View File

@ -14,6 +14,8 @@
from oslo_log import log as logging
from ovsdbapp.backend.ovs_idl import connection
from ovn_octavia_provider.common import config as ovn_conf
from ovn_octavia_provider import event as ovn_event
from ovn_octavia_provider import helper as ovn_helper
@ -44,7 +46,9 @@ def OvnProviderAgent(exit_event):
ovn_nb_idl_for_events = impl_idl_ovn.OvnNbIdlForLb(
event_lock_name=OVN_EVENT_LOCK_NAME)
ovn_nb_idl_for_events.notify_handler.watch_events(events)
ovn_nb_idl_for_events.start()
c = connection.Connection(ovn_nb_idl_for_events,
ovn_conf.get_ovn_ovsdb_timeout())
c.start()
ovn_sb_idl_for_events = impl_idl_ovn.OvnSbIdlForLb(
event_lock_name=OVN_EVENT_LOCK_NAME)
@ -64,7 +68,7 @@ def OvnProviderAgent(exit_event):
exit_event.wait()
LOG.info('OVN provider agent is exiting.')
ovn_nb_idl_for_events.notify_handler.unwatch_events(events)
ovn_nb_idl_for_events.stop()
c.stop()
ovn_sb_idl_for_events.notify_handler.unwatch_events(sb_events)
ovn_sb_idl_for_events.stop()
maintenance_thread.stop()

View File

@ -45,7 +45,7 @@ class OvnProviderDriver(driver_base.ProviderDriver):
# was imported, also to cover requirement from
# OvnProviderHelper and intra references modules
ovn_conf.register_opts()
self._ovn_helper = ovn_helper.OvnProviderHelper()
self._ovn_helper = ovn_helper.OvnProviderHelper(notifier=False)
def __del__(self):
self._ovn_helper.shutdown()

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import copy
import queue
import re
@ -29,6 +30,7 @@ from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import strutils
from ovs.stream import Stream
from ovsdbapp.backend.ovs_idl import connection
from ovsdbapp.backend.ovs_idl import idlutils
from ovsdbapp.schema.ovn_northbound import commands as cmd
@ -50,7 +52,7 @@ LOG = logging.getLogger(__name__)
class OvnProviderHelper():
def __init__(self):
def __init__(self, notifier=True):
self.requests = queue.Queue()
self.helper_thread = threading.Thread(target=self.request_handler)
self.helper_thread.daemon = True
@ -58,9 +60,10 @@ class OvnProviderHelper():
self._check_and_set_ssl_files()
self._init_lb_actions()
# NOTE(mjozefcz): This API is only for handling octavia API requests.
self.ovn_nbdb = impl_idl_ovn.OvnNbIdlForLb()
self.ovn_nbdb_api = self.ovn_nbdb.start()
i = impl_idl_ovn.OvnNbIdlForLb(notifier=notifier)
c = connection.Connection(i, ovn_conf.get_ovn_ovsdb_timeout())
self.ovn_nbdb_api = impl_idl_ovn.OvsdbNbOvnIdl(c)
atexit.register(self.ovn_nbdb_api.ovsdb_connection.stop)
self.helper_thread.start()
@ -122,8 +125,6 @@ class OvnProviderHelper():
self.requests.put({'type': ovn_const.REQ_TYPE_EXIT},
timeout=ovn_const.MAX_TIMEOUT_REQUEST)
self.helper_thread.join()
self.ovn_nbdb.stop()
del self.ovn_nbdb_api
@staticmethod
def _map_val(row, col, key):

View File

@ -20,8 +20,10 @@ import netaddr
from neutron_lib import constants as n_const
from oslo_config import cfg
from oslo_log import log as logging
from ovsdbapp.backend.ovs_idl import connection
from ovn_octavia_provider.common import clients
from ovn_octavia_provider.common import config as ovn_conf
from ovn_octavia_provider.common import constants as ovn_const
from ovn_octavia_provider.ovsdb import impl_idl_ovn
@ -62,7 +64,9 @@ class DBInconsistenciesPeriodics(object):
def __init__(self):
self.ovn_nbdb = impl_idl_ovn.OvnNbIdlForLb()
self.ovn_nbdb_api = self.ovn_nbdb.start()
c = connection.Connection(self.ovn_nbdb,
ovn_conf.get_ovn_ovsdb_timeout())
self.ovn_nbdb_api = impl_idl_ovn.OvsdbNbOvnIdl(c)
@periodics.periodic(spacing=600, run_immediately=True)
def change_device_owner_lb_hm_ports(self):

View File

@ -31,7 +31,6 @@ from ovn_octavia_provider.common import config
from ovn_octavia_provider.common import exceptions as ovn_exc
from ovn_octavia_provider.common import utils
from ovn_octavia_provider.i18n import _
from ovn_octavia_provider.ovsdb import impl_idl_ovn
from ovn_octavia_provider.ovsdb import ovsdb_monitor
@ -52,34 +51,7 @@ class OvnNbTransaction(idl_trans.Transaction):
self.api.nb_global.increment('nb_cfg')
# This version of Backend doesn't use a class variable for ovsdb_connection
# and therefor allows networking-ovn to manage connection scope on its own
class Backend(ovs_idl.Backend):
lookup_table = {}
ovsdb_connection = None
def __init__(self, connection):
self.ovsdb_connection = connection
super().__init__(connection)
def start_connection(self, connection):
try:
self.ovsdb_connection.start()
except Exception as e:
connection_exception = OvsdbConnectionUnavailable(
db_schema=self.schema, error=e)
LOG.exception(connection_exception)
raise connection_exception from e
@property
def idl(self):
return self.ovsdb_connection.idl
@property
def tables(self):
return self.idl.tables
_tables = tables
def is_table_present(self, table_name):
return table_name in self._tables
@ -88,11 +60,6 @@ class Backend(ovs_idl.Backend):
return self.is_table_present(table_name) and (
col_name in self._tables[table_name].columns)
def create_transaction(self, check_error=False, log_errors=True):
return idl_trans.Transaction(
self, self.ovsdb_connection, self.ovsdb_connection.timeout,
check_error, log_errors)
# Check for a column match in the table. If not found do a retry with
# a stop delay of 10 secs. This function would be useful if the caller
# wants to verify for the presence of a particular row in the table
@ -244,40 +211,23 @@ class OvnNbIdlForLb(ovsdb_monitor.OvnIdl):
'Logical_Router', 'Logical_Switch_Port', 'Logical_Router_Port',
'Gateway_Chassis', 'NAT')
def __init__(self, event_lock_name=None):
def __init__(self, event_lock_name=None, notifier=True):
self.conn_string = config.get_ovn_nb_connection()
ovsdb_monitor._check_and_set_ssl_files(self.SCHEMA)
helper = self._get_ovsdb_helper(self.conn_string)
for table in OvnNbIdlForLb.TABLES:
helper.register_table(table)
super().__init__(
driver=None, remote=self.conn_string, schema=helper)
driver=None, remote=self.conn_string, schema=helper,
notifier=notifier)
self.event_lock_name = event_lock_name
if self.event_lock_name:
self.set_lock(self.event_lock_name)
atexit.register(self.stop)
@utils.retry()
def _get_ovsdb_helper(self, connection_string):
return idlutils.get_schema_helper(connection_string, self.SCHEMA)
def start(self):
self.conn = connection.Connection(
self, timeout=config.get_ovn_ovsdb_timeout())
return impl_idl_ovn.OvsdbNbOvnIdl(self.conn)
def stop(self):
# Close the running connection if it has been initalized
if hasattr(self, 'conn'):
if not self.conn.stop(timeout=config.get_ovn_ovsdb_timeout()):
LOG.debug("Connection terminated to OvnNb "
"but a thread is still alive")
del self.conn
# complete the shutdown for the event handler
self.notify_handler.shutdown()
# Close the idl session
self.close()
class OvnSbIdlForLb(ovsdb_monitor.OvnIdl):
SCHEMA = "OVN_Southbound"
@ -303,7 +253,7 @@ class OvnSbIdlForLb(ovsdb_monitor.OvnIdl):
def start(self):
self.conn = connection.Connection(
self, timeout=config.get_ovn_ovsdb_timeout())
return impl_idl_ovn.OvsdbSbOvnIdl(self.conn)
return OvsdbSbOvnIdl(self.conn)
def stop(self):
# Close the running connection if it has been initalized

View File

@ -38,10 +38,13 @@ class BaseOvnIdl(connection.OvsdbIdl):
class OvnIdl(BaseOvnIdl):
def __init__(self, driver, remote, schema):
def __init__(self, driver, remote, schema, notifier=True):
super().__init__(remote, schema)
self.driver = driver
self.notify_handler = OvnDbNotifyHandler(driver)
if notifier:
self.notify_handler = OvnDbNotifyHandler(driver)
else:
self.notify_handler = None
# ovsdb lock name to acquire.
# This event lock is used to handle the notify events sent by idl.Idl
# idl.Idl will call notify function for the "update" rpc method it
@ -66,6 +69,8 @@ class OvnIdl(BaseOvnIdl):
# but not granted by the ovsdb-server.
if self.is_lock_contended:
return
if not self.notify_handler:
return
row = idlutils.frozen_row(row)
self.notify_handler.notify(event, row, updates)

View File

@ -18,6 +18,7 @@ import multiprocessing as mp
from neutron.common import utils as n_utils
from oslo_utils import uuidutils
from ovsdbapp.backend.ovs_idl import connection
from ovn_octavia_provider import agent as ovn_agent
from ovn_octavia_provider.common import config as ovn_config
@ -46,8 +47,10 @@ class TestOvnOctaviaProviderAgent(ovn_base.TestOvnOctaviaBase):
ovn_nb_idl_for_events = impl_idl_ovn.OvnNbIdlForLb(
event_lock_name='func_test')
ovn_nb_idl_for_events.notify_handler.watch_events(events)
ovn_nb_idl_for_events.start()
atexit.register(da_helper.shutdown)
c = connection.Connection(ovn_nb_idl_for_events,
ovn_config.get_ovn_ovsdb_timeout())
ovn_nbdb_api = impl_idl_ovn.OvsdbNbOvnIdl(c)
atexit.register(ovn_nbdb_api.ovsdb_connection.stop)
def _test_lrp_event_handler(self, cascade=False):
# Create Network N1 on router R1 and LBA on N1

View File

@ -43,6 +43,12 @@ class TestOvnOctaviaBase(base.BaseTestCase):
ovn_sb_idl = mock.patch(
'ovn_octavia_provider.ovsdb.impl_idl_ovn.OvnSbIdlForLb')
self.mock_ovn_sb_idl = ovn_sb_idl.start()
ovsdb_nb_idl = mock.patch(
'ovn_octavia_provider.ovsdb.impl_idl_ovn.OvsdbNbOvnIdl')
self.mock_ovsdb_nb_idl = ovsdb_nb_idl.start()
connection = mock.patch(
'ovsdbapp.backend.ovs_idl.connection.Connection')
self.mock_connection = connection.start()
self.member_address = '192.168.2.149'
self.vip_address = '192.148.210.109'
self.vip_dict = {'vip_network_id': uuidutils.generate_uuid(),

View File

@ -52,42 +52,6 @@ class TestOvnNbIdlForLb(base.BaseTestCase):
self.idl._get_ovsdb_helper('foo')
self.mock_gsh.assert_called_once_with('foo', 'OVN_Northbound')
@mock.patch.object(real_ovs_idl.Backend, 'autocreate_indices', mock.Mock(),
create=True)
def test_start(self):
with mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection',
side_effect=lambda x, timeout: mock.Mock()):
idl1 = impl_idl_ovn.OvnNbIdlForLb()
ret1 = idl1.start()
id1 = id(ret1.ovsdb_connection)
idl2 = impl_idl_ovn.OvnNbIdlForLb()
ret2 = idl2.start()
id2 = id(ret2.ovsdb_connection)
self.assertNotEqual(id1, id2)
@mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection')
def test_stop(self, mock_conn):
mock_conn.stop.return_value = False
with (
mock.patch.object(
self.idl.notify_handler, 'shutdown')) as mock_notify, (
mock.patch.object(self.idl, 'close')) as mock_close:
self.idl.start()
self.idl.stop()
mock_notify.assert_called_once_with()
mock_close.assert_called_once_with()
@mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection')
def test_stop_no_connection(self, mock_conn):
mock_conn.stop.return_value = False
with (
mock.patch.object(
self.idl.notify_handler, 'shutdown')) as mock_notify, (
mock.patch.object(self.idl, 'close')) as mock_close:
self.idl.stop()
mock_notify.assert_called_once_with()
mock_close.assert_called_once_with()
def test_setlock(self):
with mock.patch.object(impl_idl_ovn.OvnNbIdlForLb,
'set_lock') as set_lock:
@ -113,7 +77,7 @@ class TestOvnSbIdlForLb(base.BaseTestCase):
@mock.patch.object(real_ovs_idl.Backend, 'autocreate_indices', mock.Mock(),
create=True)
def test_start(self):
def test_start_reuses_connection(self):
with mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection',
side_effect=lambda x, timeout: mock.Mock()):
idl1 = impl_idl_ovn.OvnSbIdlForLb()
@ -122,7 +86,7 @@ class TestOvnSbIdlForLb(base.BaseTestCase):
idl2 = impl_idl_ovn.OvnSbIdlForLb()
ret2 = idl2.start()
id2 = id(ret2.ovsdb_connection)
self.assertNotEqual(id1, id2)
self.assertEqual(id1, id2)
@mock.patch('ovsdbapp.backend.ovs_idl.connection.Connection')
def test_stop(self, mock_conn):

View File

@ -4007,8 +4007,8 @@ class TestOvnProviderHelper(ovn_base.TestOvnOctaviaBase):
prov_helper1 = ovn_helper.OvnProviderHelper()
prov_helper2 = ovn_helper.OvnProviderHelper()
# One connection for API requests
self.assertIs(prov_helper1.ovn_nbdb_api,
prov_helper2.ovn_nbdb_api)
self.assertIs(prov_helper1.ovn_nbdb_api.ovsdb_connection,
prov_helper2.ovn_nbdb_api.ovsdb_connection)
# One connection to handle events
self.assertIs(prov_helper1.ovn_nbdb_api_for_events,
prov_helper2.ovn_nbdb_api_for_events)