Merge "Transport context to all threads"

This commit is contained in:
Zuul 2023-02-27 15:11:25 +00:00 committed by Gerrit Code Review
commit d443f8e4c4
4 changed files with 37 additions and 29 deletions

View File

@ -8913,7 +8913,8 @@ class ComputeManager(manager.Manager):
# in order to be able to track and abort it in the future. # in order to be able to track and abort it in the future.
self._waiting_live_migrations[instance.uuid] = (None, None) self._waiting_live_migrations[instance.uuid] = (None, None)
try: try:
future = self._live_migration_executor.submit( future = nova.utils.pass_context(
self._live_migration_executor.submit,
self._do_live_migration, context, dest, instance, self._do_live_migration, context, dest, instance,
block_migration, migration, migrate_data) block_migration, migration, migrate_data)
self._waiting_live_migrations[instance.uuid] = (migration, future) self._waiting_live_migrations[instance.uuid] = (migration, future)
@ -10197,7 +10198,9 @@ class ComputeManager(manager.Manager):
else: else:
LOG.debug('Triggering sync for uuid %s', uuid) LOG.debug('Triggering sync for uuid %s', uuid)
self._syncs_in_progress[uuid] = True self._syncs_in_progress[uuid] = True
self._sync_power_pool.spawn_n(_sync, db_instance) nova.utils.pass_context(self._sync_power_pool.spawn_n,
_sync,
db_instance)
def _query_driver_power_state_and_sync(self, context, db_instance): def _query_driver_power_state_and_sync(self, context, db_instance):
if db_instance.task_state is not None: if db_instance.task_state is not None:

View File

@ -2096,8 +2096,8 @@ class ComputeTaskManager:
skipped_host(target_ctxt, host, image_ids) skipped_host(target_ctxt, host, image_ids)
continue continue
fetch_pool.spawn_n(wrap_cache_images, target_ctxt, host, utils.pass_context(fetch_pool.spawn_n, wrap_cache_images,
image_ids) target_ctxt, host, image_ids)
# Wait until all those things finish # Wait until all those things finish
fetch_pool.waitall() fetch_pool.waitall()

View File

@ -9648,9 +9648,15 @@ class ComputeManagerMigrationTestCase(test.NoDBTestCase,
self.assertEqual(driver_console.get_connection_info.return_value, self.assertEqual(driver_console.get_connection_info.return_value,
console) console)
@mock.patch('nova.utils.pass_context')
@mock.patch('nova.compute.manager.ComputeManager.' @mock.patch('nova.compute.manager.ComputeManager.'
'_do_live_migration') '_do_live_migration')
def _test_max_concurrent_live(self, mock_lm): def _test_max_concurrent_live(self, mock_lm, mock_pass_context):
# pass_context wraps the function, which doesn't work with a mock
# So we simply mock it too
def _mock_pass_context(runner, func, *args, **kwargs):
return runner(func, *args, **kwargs)
mock_pass_context.side_effect = _mock_pass_context
@mock.patch('nova.objects.Migration.save') @mock.patch('nova.objects.Migration.save')
def _do_it(mock_mig_save): def _do_it(mock_mig_save):

View File

@ -632,15 +632,13 @@ def _serialize_profile_info():
return trace_info return trace_info
def spawn(func, *args, **kwargs): def pass_context(runner, func, *args, **kwargs):
"""Passthrough method for eventlet.spawn. """Generalised passthrough method
This utility exists so that it can be stubbed for testing without It will grab the context from the threadlocal store and add it to
interfering with the service spawns. the store on the runner. This allows for continuity in logging the
context when using this method to spawn a new thread through the
It will also grab the context from the threadlocal store and add it to runner function
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
""" """
_context = common_context.get_current() _context = common_context.get_current()
profiler_info = _serialize_profile_info() profiler_info = _serialize_profile_info()
@ -655,7 +653,21 @@ def spawn(func, *args, **kwargs):
profiler.init(**profiler_info) profiler.init(**profiler_info)
return func(*args, **kwargs) return func(*args, **kwargs)
return eventlet.spawn(context_wrapper, *args, **kwargs) return runner(context_wrapper, *args, **kwargs)
def spawn(func, *args, **kwargs):
"""Passthrough method for eventlet.spawn.
This utility exists so that it can be stubbed for testing without
interfering with the service spawns.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
return pass_context(eventlet.spawn, func, *args, **kwargs)
def spawn_n(func, *args, **kwargs): def spawn_n(func, *args, **kwargs):
@ -668,25 +680,12 @@ def spawn_n(func, *args, **kwargs):
the store on the new thread. This allows for continuity in logging the the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread. context when using this method to spawn a new thread.
""" """
_context = common_context.get_current() pass_context(eventlet.spawn_n, func, *args, **kwargs)
profiler_info = _serialize_profile_info()
@functools.wraps(func)
def context_wrapper(*args, **kwargs):
# NOTE: If update_store is not called after spawn_n it won't be
# available for the logger to pull from threadlocal storage.
if _context is not None:
_context.update_store()
if profiler_info and profiler:
profiler.init(**profiler_info)
func(*args, **kwargs)
eventlet.spawn_n(context_wrapper, *args, **kwargs)
def tpool_execute(func, *args, **kwargs): def tpool_execute(func, *args, **kwargs):
"""Run func in a native thread""" """Run func in a native thread"""
tpool.execute(func, *args, **kwargs) return pass_context(tpool.execute, func, *args, **kwargs)
def is_none_string(val): def is_none_string(val):