make libvirt driver get_connection thread-safe

libvirt driver's get_connection is not thread safe in the
presence of a libvirtd restart during concurrent incoming
requests.

With existing code each will in turn call get_connection,
find the connection is broken, try to create new one, block
for a while and yield to the next thread to do the same.
You get as many connections as there are incoming requests
and only the last one is used finally. If enough are incoming
these connections can exhaust the client pool configured
for libvirtd.
One fix is to hold a lock while creating the connection.
Note that has_min_version calls _conn which calls get_connection
and thus the direct call to _has_min_version()

Also added the exception text if it fails to register an event
handler for lifecycle events.

Change-Id: I090765802bfe443440f16722bc7c43b6280fe56a
Fixes: bug #1240905
(cherry picked from commit b2e64e3798)
This commit is contained in:
Tom Hancock 2013-10-17 09:48:54 +00:00 committed by Matt Riedemann
parent 0e4dd1bfc4
commit 98ab49bbb2
2 changed files with 74 additions and 20 deletions

View File

@ -5574,6 +5574,54 @@ class LibvirtConnTestCase(test.TestCase):
self.assertEqual('foo', conn.get_hypervisor_hostname())
self.assertEqual('foo', conn.get_hypervisor_hostname())
def test_get_connection_serial(self):
def get_conn_currency(driver):
driver._conn.getLibVersion()
def connect_with_block(*a, **k):
# enough to allow another connect to run
eventlet.sleep(0)
self.calls += 1
return self.conn
self.calls = 0
self.stubs.Set(libvirt_driver.LibvirtDriver,
'_connect', connect_with_block)
driver = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
# call serially
get_conn_currency(driver)
get_conn_currency(driver)
self.assertEqual(self.calls, 1)
def test_get_connection_concurrency(self):
def get_conn_currency(driver):
driver._conn.getLibVersion()
def connect_with_block(*a, **k):
# enough to allow another connect to run
eventlet.sleep(0)
self.calls += 1
return self.conn
self.calls = 0
self.stubs.Set(libvirt_driver.LibvirtDriver,
'_connect', connect_with_block)
driver = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
# call concurrently
thr1 = eventlet.spawn(get_conn_currency, driver=driver)
thr2 = eventlet.spawn(get_conn_currency, driver=driver)
# let threads run
eventlet.sleep(0)
thr1.wait()
thr2.wait()
self.assertEqual(self.calls, 1)
def test_post_live_migration_at_destination_with_block_device_info(self):
# Preparing mocks
dummyxml = ("<domain type='kvm'><name>instance-00000001</name>"

View File

@ -398,20 +398,21 @@ class LibvirtDriver(driver.ComputeDriver):
driver_cache)
conf.driver_cache = cache_mode
def has_min_version(self, lv_ver=None, hv_ver=None, hv_type=None):
@staticmethod
def _has_min_version(conn, lv_ver=None, hv_ver=None, hv_type=None):
try:
if lv_ver is not None:
libvirt_version = self._conn.getLibVersion()
libvirt_version = conn.getLibVersion()
if libvirt_version < utils.convert_version_to_int(lv_ver):
return False
if hv_ver is not None:
hypervisor_version = self._conn.getVersion()
hypervisor_version = conn.getVersion()
if hypervisor_version < utils.convert_version_to_int(hv_ver):
return False
if hv_type is not None:
hypervisor_type = self._conn.getType()
hypervisor_type = conn.getType()
if hypervisor_type != hv_type:
return False
@ -419,6 +420,9 @@ class LibvirtDriver(driver.ComputeDriver):
except Exception:
return False
def has_min_version(self, lv_ver=None, hv_ver=None, hv_type=None):
return self._has_min_version(self._conn, lv_ver, hv_ver, hv_type)
def _native_thread(self):
"""Receives async events coming in from libvirtd.
@ -577,18 +581,18 @@ class LibvirtDriver(driver.ComputeDriver):
self._init_events()
def _get_connection(self):
# multiple concurrent connections are protected by _wrapped_conn_lock
with self._wrapped_conn_lock:
wrapped_conn = self._wrapped_conn
if not wrapped_conn or not self._test_connection(wrapped_conn):
LOG.debug(_('Connecting to libvirt: %s'), self.uri())
if not CONF.libvirt_nonblocking:
wrapped_conn = self._connect(self.uri(), self.read_only)
else:
wrapped_conn = tpool.proxy_call(
(libvirt.virDomain, libvirt.virConnect),
self._connect, self.uri(), self.read_only)
with self._wrapped_conn_lock:
if not wrapped_conn or not self._test_connection(wrapped_conn):
LOG.debug(_('Connecting to libvirt: %s'), self.uri())
if not CONF.libvirt_nonblocking:
wrapped_conn = self._connect(self.uri(), self.read_only)
else:
wrapped_conn = tpool.proxy_call(
(libvirt.virDomain, libvirt.virConnect),
self._connect, self.uri(), self.read_only)
self._wrapped_conn = wrapped_conn
try:
@ -599,19 +603,21 @@ class LibvirtDriver(driver.ComputeDriver):
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
self._event_lifecycle_callback,
self)
except Exception:
LOG.warn(_("URI %s does not support events"),
self.uri())
except Exception as e:
LOG.warn(_("URI %(uri)s does not support events: %(error)s"),
{'uri': self.uri(), 'error': e})
if self.has_min_version(MIN_LIBVIRT_CLOSE_CALLBACK_VERSION):
if self._has_min_version(wrapped_conn,
MIN_LIBVIRT_CLOSE_CALLBACK_VERSION):
try:
LOG.debug(_("Registering for connection events: %s") %
str(self))
wrapped_conn.registerCloseCallback(
self._close_callback, None)
except libvirt.libvirtError:
LOG.debug(_("URI %s does not support connection events"),
self.uri())
except libvirt.libvirtError as e:
LOG.warn(_("URI %(uri)s does not support connection"
" events: %(error)s"),
{'uri': self.uri(), 'error': e})
return wrapped_conn