Merge "Updated cache module and its dependencies"

This commit is contained in:
Jenkins 2014-10-21 10:22:56 +00:00 committed by Gerrit Code Review
commit 63d2ec323e
4 changed files with 136 additions and 125 deletions

View File

@ -14,9 +14,10 @@
import collections
from oslo.utils import timeutils
from neutron.openstack.common.cache import backends
from neutron.openstack.common import lockutils
from neutron.openstack.common import timeutils
class MemoryBackend(backends.BaseCache):
@ -147,7 +148,7 @@ class MemoryBackend(backends.BaseCache):
try:
# NOTE(flaper87): Keys with ttl == 0
# don't exist in the _keys_expires dict
self._keys_expires[value[0]].remove(value[1])
self._keys_expires[value[0]].remove(key)
except (KeyError, ValueError):
pass

View File

@ -26,9 +26,9 @@ class BaseCache(object):
:params parsed_url: Parsed url object.
:params options: A dictionary with configuration parameters
for the cache. For example:
- default_ttl: An integer defining the default ttl
for keys.
for the cache. For example:
- default_ttl: An integer defining the default ttl for keys.
"""
def __init__(self, parsed_url, options=None):
@ -43,20 +43,17 @@ class BaseCache(object):
def set(self, key, value, ttl, not_exists=False):
"""Sets or updates a cache entry
NOTE: Thread-safety is required and has to be
guaranteed by the backend implementation.
.. note:: Thread-safety is required and has to be guaranteed by the
backend implementation.
:params key: Item key as string.
:type key: `unicode string`
:params value: Value to assign to the key. This
can be anything that is handled
by the current backend.
:params ttl: Key's timeout in seconds. 0 means
no timeout.
:params value: Value to assign to the key. This can be anything that
is handled by the current backend.
:params ttl: Key's timeout in seconds. 0 means no timeout.
:type ttl: int
:params not_exists: If True, the key will be set
if it doesn't exist. Otherwise,
it'll always be set.
:params not_exists: If True, the key will be set if it doesn't exist.
Otherwise, it'll always be set.
:type not_exists: bool
:returns: True if the operation succeeds, False otherwise.
@ -74,9 +71,8 @@ class BaseCache(object):
:params key: Item key as string.
:type key: `unicode string`
:params value: Value to assign to the key. This
can be anything that is handled
by the current backend.
:params value: Value to assign to the key. This can be anything that
is handled by the current backend.
"""
try:
return self[key]
@ -91,15 +87,14 @@ class BaseCache(object):
def get(self, key, default=None):
"""Gets one item from the cache
NOTE: Thread-safety is required and it has to be
guaranteed by the backend implementation.
.. note:: Thread-safety is required and it has to be guaranteed
by the backend implementation.
:params key: Key for the item to retrieve
from the cache.
:params key: Key for the item to retrieve from the cache.
:params default: The default value to return.
:returns: `key`'s value in the cache if it exists,
otherwise `default` should be returned.
:returns: `key`'s value in the cache if it exists, otherwise
`default` should be returned.
"""
return self._get(key, default)
@ -115,8 +110,8 @@ class BaseCache(object):
def __delitem__(self, key):
"""Removes an item from cache.
NOTE: Thread-safety is required and it has to be
guaranteed by the backend implementation.
.. note:: Thread-safety is required and it has to be guaranteed by
the backend implementation.
:params key: The key to remove.
@ -130,8 +125,8 @@ class BaseCache(object):
def clear(self):
"""Removes all items from the cache.
NOTE: Thread-safety is required and it has to be
guaranteed by the backend implementation.
.. note:: Thread-safety is required and it has to be guaranteed by
the backend implementation.
"""
return self._clear()
@ -143,9 +138,8 @@ class BaseCache(object):
"""Increments the value for a key
:params key: The key for the value to be incremented
:params delta: Number of units by which to increment
the value. Pass a negative number to
decrement the value.
:params delta: Number of units by which to increment the value.
Pass a negative number to decrement the value.
:returns: The new value
"""
@ -158,10 +152,8 @@ class BaseCache(object):
def append_tail(self, key, tail):
"""Appends `tail` to `key`'s value.
:params key: The key of the value to which
`tail` should be appended.
:params tail: The list of values to append to the
original.
:params key: The key of the value to which `tail` should be appended.
:params tail: The list of values to append to the original.
:returns: The new value
"""
@ -181,10 +173,8 @@ class BaseCache(object):
def append(self, key, value):
"""Appends `value` to `key`'s value.
:params key: The key of the value to which
`tail` should be appended.
:params value: The value to append to the
original.
:params key: The key of the value to which `tail` should be appended.
:params value: The value to append to the original.
:returns: The new value
"""
@ -196,8 +186,7 @@ class BaseCache(object):
:params key: The key to verify.
:returns: True if the key exists,
otherwise False.
:returns: True if the key exists, otherwise False.
"""
@abc.abstractmethod
@ -209,9 +198,8 @@ class BaseCache(object):
"""Gets keys' value from cache
:params keys: List of keys to retrieve.
:params default: The default value to return
for each key that is not in
the cache.
:params default: The default value to return for each key that is not
in the cache.
:returns: A generator of (key, value)
"""
@ -227,13 +215,12 @@ class BaseCache(object):
def set_many(self, data, ttl=None):
"""Puts several items into the cache at once
Depending on the backend, this operation may or may
not be efficient. The default implementation calls
set for each (key, value) pair passed, other backends
support set_many operations as part of their protocols.
Depending on the backend, this operation may or may not be efficient.
The default implementation calls set for each (key, value) pair
passed, other backends support set_many operations as part of their
protocols.
:params data: A dictionary like {key: val} to store
in the cache.
:params data: A dictionary like {key: val} to store in the cache.
:params ttl: Key's timeout in seconds.
"""

View File

@ -24,7 +24,7 @@ from six.moves.urllib import parse
from stevedore import driver
def _get_olso_configs():
def _get_oslo_configs():
"""Returns the oslo.config options to register."""
# NOTE(flaper87): Oslo config should be
# optional. Instead of doing try / except
@ -45,7 +45,7 @@ def register_oslo_configs(conf):
:params conf: Config object.
:type conf: `cfg.ConfigOptions`
"""
conf.register_opts(_get_olso_configs())
conf.register_opts(_get_oslo_configs())
def get_cache(url='memory://'):

View File

@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import functools
import logging
import os
import shutil
import subprocess
@ -29,9 +29,7 @@ import weakref
from oslo.config import cfg
from neutron.openstack.common import fileutils
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import local
from neutron.openstack.common import log as logging
from neutron.openstack.common._i18n import _, _LE, _LI
LOG = logging.getLogger(__name__)
@ -39,10 +37,10 @@ LOG = logging.getLogger(__name__)
util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
help='Enables or disables inter-process locks.'),
cfg.StrOpt('lock_path',
default=os.environ.get("NEUTRON_LOCK_PATH"),
help=('Directory to use for lock files.'))
help='Directory to use for lock files.')
]
@ -54,7 +52,7 @@ def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object):
class _FileLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
@ -76,7 +74,13 @@ class _InterProcessLock(object):
self.lockfile = None
self.fname = name
def __enter__(self):
def acquire(self):
basedir = os.path.dirname(self.fname)
if not os.path.exists(basedir):
fileutils.ensure_tree(basedir)
LOG.info(_LI('Created lock path: %s'), basedir)
self.lockfile = open(self.fname, 'w')
while True:
@ -86,23 +90,39 @@ class _InterProcessLock(object):
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
LOG.debug('Got file lock "%s"', self.fname)
return True
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
raise
raise threading.ThreadError(_("Unable to acquire lock on"
" `%(filename)s` due to"
" %(exception)s") %
{'filename': self.fname,
'exception': e})
def __exit__(self, exc_type, exc_val, exc_tb):
def __enter__(self):
self.acquire()
return self
def release(self):
try:
self.unlock()
self.lockfile.close()
LOG.debug('Released file lock "%s"', self.fname)
except IOError:
LOG.exception(_("Could not release the acquired lock `%s`"),
LOG.exception(_LE("Could not release the acquired lock `%s`"),
self.fname)
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def exists(self):
return os.path.exists(self.fname)
def trylock(self):
raise NotImplementedError()
@ -110,7 +130,7 @@ class _InterProcessLock(object):
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
class _WindowsLock(_FileLock):
def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
@ -118,7 +138,7 @@ class _WindowsLock(_InterProcessLock):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
class _FcntlLock(_FileLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
@ -131,12 +151,63 @@ if os.name == 'nt':
InterProcessLock = _WindowsLock
else:
import fcntl
InterProcessLock = _PosixLock
InterProcessLock = _FcntlLock
_semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock()
def _get_lock_path(name, lock_file_prefix, lock_path=None):
# NOTE(mikal): the lock name cannot contain directory
# separators
name = name.replace(os.sep, '_')
if lock_file_prefix:
sep = '' if lock_file_prefix.endswith('-') else '-'
name = '%s%s%s' % (lock_file_prefix, sep, name)
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
return os.path.join(local_lock_path, name)
def external_lock(name, lock_file_prefix=None, lock_path=None):
LOG.debug('Attempting to grab external lock "%(lock)s"',
{'lock': name})
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
return InterProcessLock(lock_file_path)
def remove_external_lock_file(name, lock_file_prefix=None):
"""Remove an external lock file when it's not used anymore
This will be helpful when we have a lot of lock files
"""
with internal_lock(name):
lock_file_path = _get_lock_path(name, lock_file_prefix)
try:
os.remove(lock_file_path)
except OSError:
LOG.info(_LI('Failed to remove file %(file)s'),
{'file': lock_file_path})
def internal_lock(name):
with _semaphores_lock:
try:
sem = _semaphores[name]
LOG.debug('Using existing semaphore "%s"', name)
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
LOG.debug('Created new semaphore "%s"', name)
return sem
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
@ -152,67 +223,19 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None):
should work across multiple processes. This means that if two different
workers both run a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
with _semaphores_lock:
try:
sem = _semaphores[name]
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
int_lock = internal_lock(name)
with int_lock:
LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
{'lock': name})
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
LOG.info(_('Created lock path: %s'), local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
ext_lock = external_lock(name, lock_file_prefix, lock_path)
with ext_lock:
yield ext_lock
else:
yield sem
yield int_lock
finally:
local.strong_store.locks_held.remove(name)
LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
@ -244,11 +267,11 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
def inner(*args, **kwargs):
try:
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
LOG.debug('Got semaphore / lock "%(function)s"',
{'function': f.__name__})
return f(*args, **kwargs)
finally:
LOG.debug(_('Semaphore / lock released "%(function)s"'),
LOG.debug('Semaphore / lock released "%(function)s"',
{'function': f.__name__})
return inner
return wrap