libvirt: add AsyncDeviceEventsHandler

This class will be used as synchronization point between the device
detach implementation and the generic libvirt event handling in the
libvirt driver. This later makes it possible to wait for libvirt device
removal events during device detach in a thread / greenlet safe way.

The basic usage pattern is the following:

setup:
  handler = AsyncDeviceEventsHandler()

thread1:

  waiter = handler.create_waiter(<instance>, <device>, <event types>)
  # initiate detach in libvirt
  detach()
  # block until the event is received
  event = handler.wait(waiter, timeout=20)

thread2:

  # at some point after detach() in thread1
  handler.notify_waiters(event)

This is part of the longer series trying to transform the existing
device detach handling to use libvirt events.

Change-Id: I7fc1ba2d2cbdb3f46f3649b1279b0c1a40457647
Related-Bug: #1882521
This commit is contained in:
Balazs Gibizer 2021-01-25 17:34:56 +01:00
parent 4a70fc9cfb
commit f5cd6e2dea
2 changed files with 278 additions and 0 deletions

View File

@ -27296,3 +27296,143 @@ class LibvirtDeviceRemoveEventTestCase(test.NoDBTestCase):
drvr.emit_event(event)
mock_base_handles.assert_not_called()
mock_debug.assert_not_called()
class AsyncDeviceEventsHandlerTestCase(test.NoDBTestCase):
def setUp(self):
super().setUp()
self.handler = libvirt_driver.AsyncDeviceEventsHandler()
def assert_handler_clean(self):
self.assertEqual(set(), self.handler._waiters)
def _call_parallel_after_a_delay(self, func):
def run():
time.sleep(0.1)
func()
thread = threading.Thread(target=run)
thread.start()
return thread
def test_event_received_after_wait(self):
waiter = self.handler.create_waiter(
uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent})
sent_event = libvirtevent.DeviceRemovedEvent(
uuids.instance, 'virtio-1')
thread = self._call_parallel_after_a_delay(
lambda: self.handler.notify_waiters(sent_event))
received_event = self.handler.wait(waiter, timeout=0.2)
thread.join()
self.assertEqual(sent_event, received_event)
self.assert_handler_clean()
def test_event_received_before_wait(self):
waiter = self.handler.create_waiter(
uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent})
sent_event = libvirtevent.DeviceRemovedEvent(
uuids.instance, 'virtio-1')
had_waiter = self.handler.notify_waiters(sent_event)
received_event = self.handler.wait(waiter, timeout=0.1)
self.assertTrue(had_waiter)
self.assertEqual(sent_event, received_event)
self.assert_handler_clean()
def test_event_not_received(self):
waiter = self.handler.create_waiter(
uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent})
received_event = self.handler.wait(waiter, timeout=0.1)
self.assertIsNone(received_event)
self.assert_handler_clean()
def test_event_received_without_waiter(self):
sent_event = libvirtevent.DeviceRemovedEvent(
uuids.instance, 'virtio-1')
had_waiter = self.handler.notify_waiters(sent_event)
self.assertFalse(had_waiter)
self.assert_handler_clean()
def test_create_remove_waiter_without_event(self):
waiter = self.handler.create_waiter(
uuids.instance, 'virtio-1', {libvirtevent.DeviceRemovedEvent})
self.handler.delete_waiter(waiter)
self.assert_handler_clean()
def test_waiter_cleanup(self):
inst1_dev1_waiter = self.handler.create_waiter(
uuids.instance1, 'virtio-1', {libvirtevent.DeviceRemovedEvent})
inst1_dev2_waiter = self.handler.create_waiter(
uuids.instance1,
'virtio-2',
{libvirtevent.DeviceRemovedEvent,
libvirtevent.DeviceRemovalFailedEvent})
inst2_waiter = self.handler.create_waiter(
uuids.instance2,
'virtio-1',
{libvirtevent.DeviceRemovalFailedEvent})
self.handler.notify_waiters(
libvirtevent.DeviceRemovedEvent(uuids.instance1, 'virtio-2'))
self.handler.notify_waiters(
libvirtevent.DeviceRemovedEvent(uuids.instance2, 'virtio-1'))
self.assertEqual(3, len(self.handler._waiters))
self.handler.delete_waiter(inst2_waiter)
self.assertEqual(2, len(self.handler._waiters))
self.handler.cleanup_waiters(uuids.instance1)
# we expect that the waiters are unblocked by the cleanup
self.assertTrue(inst1_dev1_waiter.threading_event.wait())
self.assertTrue(inst1_dev2_waiter.threading_event.wait())
self.assert_handler_clean()
def test_multiple_clients_for_the_same_event(self):
waiter1 = self.handler.create_waiter(
uuids.instance,
'virtio-1',
{libvirtevent.DeviceRemovedEvent,
libvirtevent.DeviceRemovalFailedEvent}
)
waiter2 = self.handler.create_waiter(
uuids.instance,
'virtio-1',
{libvirtevent.DeviceRemovedEvent}
)
waiter3 = self.handler.create_waiter(
uuids.instance,
'virtio-1',
{libvirtevent.DeviceRemovalFailedEvent}
)
sent_event = libvirtevent.DeviceRemovedEvent(
uuids.instance, 'virtio-1')
had_waiter = self.handler.notify_waiters(sent_event)
received_event1 = self.handler.wait(waiter1, timeout=0.1)
received_event2 = self.handler.wait(waiter2, timeout=0.1)
received_event3 = self.handler.wait(waiter3, timeout=0.1)
self.assertTrue(had_waiter)
self.assertEqual(sent_event, received_event1)
self.assertEqual(sent_event, received_event2)
# the third client timed out
self.assertIsNone(received_event3)
self.assert_handler_clean()

View File

@ -41,6 +41,7 @@ import random
import shutil
import sys
import tempfile
import threading
import time
import typing as ty
import uuid
@ -242,6 +243,143 @@ VGPU_RESOURCE_SEMAPHORE = 'vgpu_resources'
LIBVIRT_PERF_EVENT_PREFIX = 'VIR_PERF_PARAM_'
class AsyncDeviceEventsHandler:
"""A synchornization point between libvirt events an clients waiting for
such events.
It provides an interface for the clients to wait for one or more libvirt
event types. It implements event delivery by expecting the libvirt driver
to forward libvirt specific events to notify_waiters()
It handles multiple clients for the same instance, device and event
type and delivers the event to each clients.
"""
class Waiter:
def __init__(
self,
instance_uuid: str,
device_name: str,
event_types: ty.Set[ty.Type[libvirtevent.DeviceEvent]]
):
self.instance_uuid = instance_uuid
self.device_name = device_name
self.event_types = event_types
self.threading_event = threading.Event()
self.result: ty.Optional[libvirtevent.DeviceEvent] = None
def matches(self, event: libvirtevent.DeviceEvent) -> bool:
"""Returns true if the event is one of the expected event types
for the given instance and device.
"""
return (
self.instance_uuid == event.uuid and
self.device_name == event.dev and
isinstance(event, tuple(self.event_types)))
def __repr__(self) -> str:
return (
"AsyncDeviceEventsHandler.Waiter("
f"instance_uuid={self.instance_uuid}, "
f"device_name={self.device_name}, "
f"event_types={self.event_types})")
def __init__(self):
self._lock = threading.Lock()
# Ongoing device operations in libvirt where we wait for the events
# about success or failure.
self._waiters: ty.Set[AsyncDeviceEventsHandler.Waiter] = set()
def create_waiter(
self,
instance_uuid: str,
device_name: str,
event_types: ty.Set[ty.Type[libvirtevent.DeviceEvent]]
) -> 'AsyncDeviceEventsHandler.Waiter':
"""Returns an opaque token the caller can use in wait() to
wait for the libvirt event
:param instance_uuid: The UUID of the instance.
:param device_name: The device name alias used by libvirt for this
device.
:param event_type: A set of classes derived from DeviceEvent
specifying which event types the caller waits for. Specifying more
than one event type means waiting for either of the events to be
received.
:returns: an opaque token to be used with wait_for_event().
"""
waiter = AsyncDeviceEventsHandler.Waiter(
instance_uuid, device_name, event_types)
with self._lock:
self._waiters.add(waiter)
return waiter
def delete_waiter(self, token: 'AsyncDeviceEventsHandler.Waiter'):
"""Deletes the waiter
:param token: the opaque token returned by create_waiter() to be
deleted
"""
with self._lock:
self._waiters.remove(token)
def wait(
self, token: 'AsyncDeviceEventsHandler.Waiter', timeout: float,
) -> ty.Optional[libvirtevent.DeviceEvent]:
"""Blocks waiting for the libvirt event represented by the opaque token
:param token: A token created by calling create_waiter()
:param timeout: Maximum number of seconds this call blocks waiting for
the event to be received
:returns: The received libvirt event, or None in case of timeout
"""
token.threading_event.wait(timeout)
with self._lock:
self._waiters.remove(token)
return token.result
def notify_waiters(self, event: libvirtevent.DeviceEvent) -> bool:
"""Unblocks the client waiting for this event.
:param event: the libvirt event that is received
:returns: True if there was a client waiting and False otherwise.
"""
dispatched = False
with self._lock:
for waiter in self._waiters:
if waiter.matches(event):
waiter.result = event
waiter.threading_event.set()
dispatched = True
return dispatched
def cleanup_waiters(self, instance_uuid: str) -> None:
"""Deletes all waiters and unblock all clients related to the specific
instance.
param instance_uuid: The instance UUID for which the cleanup is
requested
"""
with self._lock:
instance_waiters = set()
for waiter in self._waiters:
if waiter.instance_uuid == instance_uuid:
# unblock any waiting thread
waiter.threading_event.set()
instance_waiters.add(waiter)
self._waiters -= instance_waiters
if instance_waiters:
LOG.debug(
'Cleaned up device related libvirt event waiters: %s',
instance_waiters)
class LibvirtDriver(driver.ComputeDriver):
def __init__(self, virtapi, read_only=False):
# NOTE(aspiers) Some of these are dynamic, so putting