From 4da3b5751a09b0ac748b75a6b7d93f809beda26e Mon Sep 17 00:00:00 2001 From: Kirill Zaitsev Date: Wed, 30 Dec 2015 20:16:17 +0300 Subject: [PATCH] Attempt deleting stale packages from cache at download time This commit modifies cleanup method of API package loader to include cleaning up of stale downloaded packages. This is done with respect to IPC and green-thread usage locks. Implements blueprint: murano-engine-package-cache Change-Id: I09a08c3646c32666893b5ed35463f8ec5e413284 --- murano/common/config.py | 2 +- murano/engine/package_loader.py | 133 +++++++++++++++--- .../tests/unit/engine/test_package_loader.py | 98 +++++++++++++ murano/utils.py | 105 ++++++++++++++ .../notes/package_cache-68495dcde223c167.yaml | 7 + 5 files changed, 324 insertions(+), 21 deletions(-) create mode 100644 releasenotes/notes/package_cache-68495dcde223c167.yaml diff --git a/murano/common/config.py b/murano/common/config.py index d3d34e622..fd69ca93b 100644 --- a/murano/common/config.py +++ b/murano/common/config.py @@ -220,7 +220,7 @@ packages_opts = [ cfg.StrOpt('packages_cache', help='Location (directory) for Murano package cache.'), - cfg.BoolOpt('enable_package_cache', default=True, + cfg.BoolOpt('enable_packages_cache', default=True, help=_('Enables murano-engine to persist on disk ' 'packages downloaded during deployments. ' 'The packages would be re-used for consequent ' diff --git a/murano/engine/package_loader.py b/murano/engine/package_loader.py index dad42c6bd..df8cf02ff 100644 --- a/murano/engine/package_loader.py +++ b/murano/engine/package_loader.py @@ -14,7 +14,7 @@ # limitations under the License. import collections -import errno +import itertools import os import os.path import shutil @@ -24,7 +24,6 @@ import uuid import eventlet from muranoclient.common import exceptions as muranoclient_exc -from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging import six @@ -39,11 +38,13 @@ from murano.engine.system import system_objects from murano.engine import yaql_yaml_loader from murano.packages import exceptions as pkg_exc from murano.packages import load_utils +from murano import utils as m_utils CONF = cfg.CONF LOG = logging.getLogger(__name__) -download_greenlocks = collections.defaultdict(lockutils.ReaderWriterLock) +download_mem_locks = collections.defaultdict(m_utils.ReaderWriterLock) +usage_mem_locks = collections.defaultdict(m_utils.ReaderWriterLock) class ApiPackageLoader(package_loader.MuranoPackageLoader): @@ -55,6 +56,10 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): self._package_cache = {} self._root_loader = root_loader or self + self._mem_locks = [] + self._ipc_locks = [] + self._downloaded = [] + def load_class_package(self, class_name, version_spec): packages = self._class_cache.get(class_name) if packages: @@ -67,6 +72,7 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): version_spec)} try: package_definition = self._get_definition(filter_opts) + self._lock_usage(package_definition) except LookupError: exc_info = sys.exc_info() raise (exceptions.NoPackageForClassFound(class_name), @@ -86,6 +92,7 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): version_spec)} try: package_definition = self._get_definition(filter_opts) + self._lock_usage(package_definition) except LookupError: exc_info = sys.exc_info() six.reraise(exceptions.NoPackageFound(package_name), @@ -106,21 +113,14 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): os.path.join(tempfile.gettempdir(), 'murano-packages-cache') ) - if CONF.packages_opts.enable_package_cache: + if CONF.packages_opts.enable_packages_cache: directory = os.path.abspath(base_directory) else: directory = os.path.abspath(os.path.join(base_directory, uuid.uuid4().hex)) if not os.path.isdir(directory): - # NOTE(kzaitsev): in case we want packages to persist on - # disk and subsequent call to makedirs might fail if 2+ loaders - # from different processes would attempt to call it simultaneously - try: - os.makedirs(directory) - except OSError as e: - if e.errno != errno.EEXIST: - raise + m_utils.ensure_tree(directory) LOG.debug('Cache for package loader is located at: {dir}'.format( dir=directory)) @@ -163,23 +163,25 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): package_directory = os.path.join( self._cache_directory, package_def.fully_qualified_name, + getattr(package_def, 'version', '0.0.0'), package_id) if os.path.isdir(package_directory): try: return load_utils.load_from_dir(package_directory) except pkg_exc.PackageLoadError: - LOG.error(_LE('Unable to load package from cache. Clean-up.')) + LOG.exception( + _LE('Unable to load package from cache. Clean-up.')) shutil.rmtree(package_directory, ignore_errors=True) # the package is not yet in cache, let's try and download it. - download_flock_path = os.path.join( + download_lock_path = os.path.join( self._cache_directory, '{}_download.lock'.format(package_id)) - download_flock = lockutils.InterProcessLock( - path=download_flock_path, sleep_func=eventlet.sleep) + download_ipc_lock = m_utils.ExclusiveInterProcessLock( + path=download_lock_path, sleep_func=eventlet.sleep) - with download_greenlocks[package_id].write_lock(),\ - download_flock: + with download_mem_locks[package_id].write_lock(),\ + download_ipc_lock: # NOTE(kzaitsev): # in case there were 2 concurrent threads/processes one might have @@ -216,6 +218,11 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): LOG.info(_LI( "Successfully downloaded and unpacked package {} {}") .format(package_def.fully_qualified_name, package_id)) + self._downloaded.append(app_package) + + self.try_cleanup_cache( + os.path.split(package_directory)[0], + current_id=package_id) return app_package except IOError: msg = 'Unable to extract package data for %s' % package_id @@ -228,6 +235,42 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): except OSError: pass + def try_cleanup_cache(self, package_directory=None, current_id=None): + if not package_directory: + return + + pkg_ids_listed = set() + try: + pkg_ids_listed = set(os.listdir(package_directory)) + except OSError: + # No directory for this package, probably someone + # already deleted everything. Anyway nothing to delete + return + + # if current_id was given: ensure it's not checked for removal + pkg_ids_listed -= {current_id} + + for pkg_id in pkg_ids_listed: + stale_directory = os.path.join( + package_directory, + pkg_id) + if os.path.isdir(package_directory): + + usage_lock_path = os.path.join( + self._cache_directory, + '{}_usage.lock'.format(current_id)) + ipc_lock = m_utils.ExclusiveInterProcessLock( + path=usage_lock_path, sleep_func=eventlet.sleep) + + with usage_mem_locks[pkg_id].write_lock(False) as acquired: + if not acquired: + continue + acquired_ipc_lock = ipc_lock.acquire(blocking=False) + if acquired_ipc_lock: + shutil.rmtree(stale_directory, + ignore_errors=True) + ipc_lock.release() + def _get_best_package_match(self, packages): public = None other = [] @@ -243,9 +286,43 @@ class ApiPackageLoader(package_loader.MuranoPackageLoader): elif other: return other[0] + def _lock_usage(self, package_definition): + """Locks package for usage""" + + # do not lock anything if we do not persist packages on disk + if not CONF.packages_opts.enable_packages_cache: + return + + # A work around the fact that read_lock only supports `with` syntax. + mem_lock = _with_to_generator( + usage_mem_locks[package_definition].read_lock()) + + package_id = package_definition.id + usage_lock_path = os.path.join(self._cache_directory, + '{}_usage.lock'.format(package_id)) + ipc_lock = m_utils.SharedInterProcessLock( + path=usage_lock_path, + sleep_func=eventlet.sleep + ) + ipc_lock = _with_to_generator(ipc_lock) + + next(mem_lock) + next(ipc_lock) + self._mem_locks.append(mem_lock) + self._ipc_locks.append(ipc_lock) + def cleanup(self): - if not CONF.packages_opts.enable_package_cache: + """Cleans up any lock we had acquired and removes any stale packages""" + + if not CONF.packages_opts.enable_packages_cache: shutil.rmtree(self._cache_directory, ignore_errors=True) + return + + for lock in itertools.chain(self._mem_locks, self._ipc_locks): + try: + next(lock) + except StopIteration: + continue def __enter__(self): return self @@ -338,6 +415,10 @@ class DirectoryPackageLoader(package_loader.MuranoPackageLoader): packages.add(folder) yield folder + def cleanup(self): + """A stub for possible cleanup""" + pass + class CombinedPackageLoader(package_loader.MuranoPackageLoader): def __init__(self, murano_client_factory, tenant_id, root_loader=None): @@ -376,12 +457,24 @@ class CombinedPackageLoader(package_loader.MuranoPackageLoader): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.api_loader.cleanup() + self.cleanup() return False + def cleanup(self): + """Calls cleanup method of all loaders we combine""" + self.api_loader.cleanup() + for d_loader in self.directory_loaders: + d_loader.cleanup() + def get_class(package, name): version = package.runtime_version loader = yaql_yaml_loader.get_loader(version) contents, file_id = package.get_class(name) return loader(contents, file_id) + + +def _with_to_generator(context_obj): + with context_obj as obj: + yield obj + yield diff --git a/murano/tests/unit/engine/test_package_loader.py b/murano/tests/unit/engine/test_package_loader.py index 320de7672..8a82a7a82 100644 --- a/murano/tests/unit/engine/test_package_loader.py +++ b/murano/tests/unit/engine/test_package_loader.py @@ -11,6 +11,8 @@ # under the License. import os +import shutil +import tempfile import mock from oslo_config import cfg @@ -19,10 +21,106 @@ import semantic_version from murano.dsl import murano_package as dsl_package from murano.engine import package_loader from murano.tests.unit import base +from murano_tempest_tests import utils CONF = cfg.CONF +class TestPackageCache(base.MuranoTestCase): + + def setUp(self): + super(TestPackageCache, self).setUp() + + self.location = tempfile.mkdtemp() + CONF.set_override('enable_packages_cache', True, 'packages_opts') + self.old_location = CONF.packages_opts.packages_cache + CONF.set_override('packages_cache', self.location, 'packages_opts') + + self.murano_client = mock.MagicMock() + self.murano_client_factory = mock.MagicMock( + return_value=self.murano_client) + self.loader = package_loader.ApiPackageLoader( + self.murano_client_factory, 'test_tenant_id') + + def tearDown(self): + CONF.set_override('packages_cache', self.old_location, 'packages_opts') + shutil.rmtree(self.location, ignore_errors=True) + super(TestPackageCache, self).tearDown() + + def test_load_package(self): + fqn = 'io.murano.apps.test' + path, name = utils.compose_package( + 'test', + os.path.join(self.location, 'manifest.yaml'), + self.location, archive_dir=self.location) + with open(path) as f: + package_data = f.read() + spec = semantic_version.Spec('*') + + old_id, new_id = '123', '456' + package = mock.MagicMock() + package.fully_qualified_name = fqn + package.id = old_id + package.version = '0.0.1' + + self.murano_client.packages.filter = mock.MagicMock( + return_value=[package]) + self.murano_client.packages.download = mock.MagicMock( + return_value=package_data) + + # load the package + self.loader.load_class_package(fqn, spec) + + # assert that everything got created + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn))) + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version))) + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version, old_id))) + self.assertTrue(os.path.isfile(os.path.join( + self.location, fqn, package.version, old_id, 'manifest.yaml'))) + + # assert, that we called download + self.assertEqual(self.murano_client.packages.download.call_count, 1) + + # now that the cache is in place, call it for the 2d time + self.loader._package_cache = {} + self.loader._class_cache = {} + self.loader.load_class_package(fqn, spec) + + # check that we didn't download a thing + self.assertEqual(self.murano_client.packages.download.call_count, 1) + + # changing id, new package would be downloaded. + package.id = new_id + self.loader._package_cache = {} + self.loader._class_cache = {} + self.loader.load_class_package(fqn, spec) + + # check that we didn't download a thing + self.assertEqual(self.murano_client.packages.download.call_count, 2) + + # check that old directories got deleted + self.assertFalse(os.path.isdir(os.path.join( + self.location, fqn, package.version, old_id))) + + # check that new directories got created correctly + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn))) + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version))) + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version, new_id))) + self.assertTrue(os.path.isfile(os.path.join( + self.location, fqn, package.version, new_id, 'manifest.yaml'))) + + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version))) + self.assertTrue(os.path.isdir(os.path.join( + self.location, fqn, package.version, new_id))) + + class TestCombinedPackageLoader(base.MuranoTestCase): @classmethod def setUpClass(cls): diff --git a/murano/utils.py b/murano/utils.py index a4e740e43..061cbfb41 100644 --- a/murano/utils.py +++ b/murano/utils.py @@ -12,8 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib +import errno +import fcntl import functools +import os +from oslo_concurrency import lockutils from oslo_log import log as logging from webob import exc @@ -124,3 +129,103 @@ def verify_session(func): raise exc.HTTPForbidden(explanation=msg) return func(self, request, *args, **kwargs) return __inner + + +def ensure_tree(path): + """Create a directory (and any ancestor directories required). + + :param path: Directory to create + :return: bool, whether the directory has actually been created + """ + try: + os.makedirs(path) + except OSError as e: + if e.errno == errno.EEXIST: + if not os.path.isdir(path): + raise + else: + return False + elif e.errno == errno.EISDIR: + return False + else: + raise + else: + return True + + +ExclusiveInterProcessLock = lockutils.InterProcessLock +if os.name == 'nt': + # no shared locks on windows + SharedInterProcessLock = lockutils.InterProcessLock +else: + + class SharedInterProcessLock(lockutils.InterProcessLock): + def trylock(self): + # LOCK_EX instead of LOCK_EX + fcntl.lockf(self.lockfile, fcntl.LOCK_SH | fcntl.LOCK_NB) + + def _do_open(self): + # the file has to be open in read mode, therefore this method has + # to be overriden + basedir = os.path.dirname(self.path) + if basedir: + made_basedir = ensure_tree(basedir) + if made_basedir: + self.logger.debug( + 'Created lock base path `%s`', basedir) + # The code here is mostly copy-pasted from oslo_concurrency and + # fasteners. The file has to be open with read permissions to be + # suitable for shared locking + if self.lockfile is None or self.lockfile.closed: + try: + # ensure the file is there, but do not obtain an extra file + # descriptor, as closing it would release fcntl lock + fd = os.open(self.path, os.O_CREAT | os.O_EXCL) + os.close(fd) + except OSError: + pass + self.lockfile = open(self.path, 'r') + + +class ReaderWriterLock(lockutils.ReaderWriterLock): + + @contextlib.contextmanager + def write_lock(self, blocking=True): + """Context manager that grants a write lock. + Will wait until no active readers. Blocks readers after acquiring. + Raises a ``RuntimeError`` if an active reader attempts to acquire + a lock. + """ + timeout = None if blocking else 0.00001 + me = self._current_thread() + i_am_writer = self.is_writer(check_pending=False) + if self.is_reader() and not i_am_writer: + raise RuntimeError("Reader %s to writer privilege" + " escalation not allowed" % me) + if i_am_writer: + # Already the writer; this allows for basic reentrancy. + yield self + else: + with self._cond: + self._pending_writers.append(me) + while True: + # No readers, and no active writer, am I next?? + if len(self._readers) == 0 and self._writer is None: + if self._pending_writers[0] == me: + self._writer = self._pending_writers.popleft() + break + + # NOTE(kzaitsev): this actually means, that we can wait + # more than timeout times, since if we get True value we + # would get another spin inside of the while loop + # Should we do anything about it? + acquired = self._cond.wait(timeout) + if not acquired: + yield False + return + try: + yield True + finally: + with self._cond: + self._writer = None + self._cond.notify_all() diff --git a/releasenotes/notes/package_cache-68495dcde223c167.yaml b/releasenotes/notes/package_cache-68495dcde223c167.yaml new file mode 100644 index 000000000..9a64cb823 --- /dev/null +++ b/releasenotes/notes/package_cache-68495dcde223c167.yaml @@ -0,0 +1,7 @@ +--- +features: + - Murano engine is now capable of caching packages on disk for reuse. + This is controlled by `packages_cache` directory path and boolean parameter + `enable_packages_cache` (true by default). The packages are cached in a + eventlet/inter-process safe manner and are cleaned up as soon as newer + version of tha package available (unless it's used by ongoing deployment)