Sync processutils and lockutils from oslo with deps

This is a sync of processutils and lockutils from oslo-incubator
along with their dependencies.

The main target for the sync is to pick up the get_worker_count() method
in commit 85f178489a128a04a7ee3ed44018403caa109ef0 so that we can set
glance-api workers equal to the number of CPUs on the host.

lockutils is also copied over due to the nature running multiple
workers and how the glance functional tests are setup, it'd be
prudent to have the latest lockutils code since it hasn't been updated
in over six months.

The change to require posix_ipc is due to commit
edce46cf5efd6d738d379205f9bf526a498845e3 in lockutils.

Changes:

processutils
------------
85f1784 Move nova.utils.cpu_count() to processutils module
cdcc19c Mask passwords that are included in commands
8a0f567 Remove str() from LOG.* and exceptions
51778f9 Allow passing environment variables to execute()
fcf517d Update oslo log messages with translation domains
af41592 Catch OSError in processutils
f773ea2 Fix i18n problem in processutils module
8b2b0b7 Use hacking import_exceptions for gettextutils._
3b71f46 Fixed misspellings of common words
12bcdb7 Remove vim header
a4dab73 Correct execute() to check 0 in check_exit_code
d6a963e Fix processutils.execute errors on windows
aa5b658 Allow passing a logging level to processutils.execute
1a2df89 Enable H302 hacking check
7119e29 Enable hacking H404 test.
2f01388 Use Python 3.x compatible except construct

lockutils
---------
de4adbc pep8: fixed multiple violations
9e88af1 fixed typos found by RETF rules
fe3389e Improve help strings
f3f14c9 Fixed several typos
0f495ee Emit a log statement when releasing internal lock
f0dd798 Remove rendundant parentheses of cfg help strings
006d9a2 Allow external locks to work with threads
9b73c19 Re-enable file-based locking behavior
edce46c Use Posix IPC in lockutils
ac84a40 Update log translation domains
fcf517d Update oslo log messages with translation domains
37a1de8 Move the released file lock to the successful path
b0d0c33 Add remove external lock files API in lockutils
4f6190a Use threading.ThreadError instead of reraising IOError
195f0b1 Have the interprocess lock follow lock conventions
15cca4b lockutils: move directory creation in lock class
81fe39e lockutils: remove lock_path parameter
14d3669 lockutils: expand add_prefix
dc06d55 lockutils: do not grab the lock in creators
a0948cb lockutils: remove local usage
df8e051 lockutils: split code handling internal/external lock

log
---
109e325 Use oslo.messaging to publish log errors
de4adbc pep8: fixed multiple violations
eac71f5 Fix common.log.ContextFormatter for Python 3
d78b633 Fixes a simple spelling mistake
621d831 always log a traceback in the sys.excepthook
90ae24b Remove redundant default=None for config options
af36c2a Fix logging setup for Python 3.4
cdcc19c Mask passwords that are included in commands

excutils
--------
33a2cee save_and_reraise_exception: make logging respect the reraise parameter
fcf517d Update oslo log messages with translation domains

fileutils
---------
9c88dc3 file_open: fixed docstring to refer to open() instead of file()
6c7407b fileutils: port to Python 3
fcf517d Update oslo log messages with translation domains

Partial-Bug: #1333325

Change-Id: I6c9d8961af2bd3bbe4d149f21cd6d4fad676ba14
This commit is contained in:
Matt Riedemann 2014-06-25 11:03:29 -07:00
parent b7968cfa93
commit dde2cbafd3
6 changed files with 283 additions and 140 deletions

View File

@ -24,7 +24,7 @@ import traceback
import six import six
from glance.openstack.common.gettextutils import _ from glance.openstack.common.gettextutils import _LE
class save_and_reraise_exception(object): class save_and_reraise_exception(object):
@ -49,9 +49,22 @@ class save_and_reraise_exception(object):
decide_if_need_reraise() decide_if_need_reraise()
if not should_be_reraised: if not should_be_reraised:
ctxt.reraise = False ctxt.reraise = False
If another exception occurs and reraise flag is False,
the saved exception will not be logged.
If the caller wants to raise new exception during exception handling
he/she sets reraise to False initially with an ability to set it back to
True if needed::
except Exception:
with save_and_reraise_exception(reraise=False) as ctxt:
[if statements to determine whether to raise a new exception]
# Not raising a new exception, so reraise
ctxt.reraise = True
""" """
def __init__(self): def __init__(self, reraise=True):
self.reraise = True self.reraise = reraise
def __enter__(self): def __enter__(self):
self.type_, self.value, self.tb, = sys.exc_info() self.type_, self.value, self.tb, = sys.exc_info()
@ -59,10 +72,11 @@ class save_and_reraise_exception(object):
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None: if exc_type is not None:
logging.error(_('Original exception being dropped: %s'), if self.reraise:
traceback.format_exception(self.type_, logging.error(_LE('Original exception being dropped: %s'),
self.value, traceback.format_exception(self.type_,
self.tb)) self.value,
self.tb))
return False return False
if self.reraise: if self.reraise:
six.reraise(self.type_, self.value, self.tb) six.reraise(self.type_, self.value, self.tb)
@ -88,8 +102,8 @@ def forever_retry_uncaught_exceptions(infunc):
if (cur_time - last_log_time > 60 or if (cur_time - last_log_time > 60 or
this_exc_message != last_exc_message): this_exc_message != last_exc_message):
logging.exception( logging.exception(
_('Unexpected exception occurred %d time(s)... ' _LE('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count) 'retrying.') % exc_count)
last_log_time = cur_time last_log_time = cur_time
last_exc_message = this_exc_message last_exc_message = this_exc_message
exc_count = 0 exc_count = 0

View File

@ -13,14 +13,12 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib import contextlib
import errno import errno
import os import os
import tempfile import tempfile
from glance.openstack.common import excutils from glance.openstack.common import excutils
from glance.openstack.common.gettextutils import _
from glance.openstack.common import log as logging from glance.openstack.common import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -60,7 +58,7 @@ def read_cached_file(filename, force_reload=False):
cache_info = _FILE_CACHE.setdefault(filename, {}) cache_info = _FILE_CACHE.setdefault(filename, {})
if not cache_info or mtime > cache_info.get('mtime', 0): if not cache_info or mtime > cache_info.get('mtime', 0):
LOG.debug(_("Reloading cached file %s") % filename) LOG.debug("Reloading cached file %s" % filename)
with open(filename) as fap: with open(filename) as fap:
cache_info['data'] = fap.read() cache_info['data'] = fap.read()
cache_info['mtime'] = mtime cache_info['mtime'] = mtime
@ -101,13 +99,13 @@ def remove_path_on_error(path, remove=delete_if_exists):
def file_open(*args, **kwargs): def file_open(*args, **kwargs):
"""Open file """Open file
see built-in file() documentation for more details see built-in open() documentation for more details
Note: The reason this is kept in a separate module is to easily Note: The reason this is kept in a separate module is to easily
be able to provide a stub module that doesn't alter system be able to provide a stub module that doesn't alter system
state at all (for unit tests) state at all (for unit tests)
""" """
return file(*args, **kwargs) return open(*args, **kwargs)
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'): def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):

View File

@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import contextlib import contextlib
import errno import errno
import fcntl
import functools import functools
import os import os
import shutil import shutil
@ -29,8 +29,7 @@ import weakref
from oslo.config import cfg from oslo.config import cfg
from glance.openstack.common import fileutils from glance.openstack.common import fileutils
from glance.openstack.common.gettextutils import _ from glance.openstack.common.gettextutils import _, _LE, _LI
from glance.openstack.common import local
from glance.openstack.common import log as logging from glance.openstack.common import log as logging
@ -39,10 +38,10 @@ LOG = logging.getLogger(__name__)
util_opts = [ util_opts = [
cfg.BoolOpt('disable_process_locking', default=False, cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'), help='Enables or disables inter-process locks.'),
cfg.StrOpt('lock_path', cfg.StrOpt('lock_path',
default=os.environ.get("GLANCE_LOCK_PATH"), default=os.environ.get("GLANCE_LOCK_PATH"),
help=('Directory to use for lock files.')) help='Directory to use for lock files.')
] ]
@ -54,7 +53,7 @@ def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path) cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object): class _FileLock(object):
"""Lock implementation which allows multiple locks, working around """Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file not require any cleanup. Since the lock is always held on a file
@ -76,7 +75,13 @@ class _InterProcessLock(object):
self.lockfile = None self.lockfile = None
self.fname = name self.fname = name
def __enter__(self): def acquire(self):
basedir = os.path.dirname(self.fname)
if not os.path.exists(basedir):
fileutils.ensure_tree(basedir)
LOG.info(_LI('Created lock path: %s'), basedir)
self.lockfile = open(self.fname, 'w') self.lockfile = open(self.fname, 'w')
while True: while True:
@ -86,23 +91,41 @@ class _InterProcessLock(object):
# Also upon reading the MSDN docs for locking(), it seems # Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism. # to have a laughable 10 attempts "blocking" mechanism.
self.trylock() self.trylock()
return self LOG.debug('Got file lock "%s"', self.fname)
return True
except IOError as e: except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN): if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables # external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning # updates - give it some time to prevent busy spinning
time.sleep(0.01) time.sleep(0.01)
else: else:
raise raise threading.ThreadError(_("Unable to acquire lock on"
" `%(filename)s` due to"
" %(exception)s") %
{
'filename': self.fname,
'exception': e,
})
def __exit__(self, exc_type, exc_val, exc_tb): def __enter__(self):
self.acquire()
return self
def release(self):
try: try:
self.unlock() self.unlock()
self.lockfile.close() self.lockfile.close()
LOG.debug('Released file lock "%s"', self.fname)
except IOError: except IOError:
LOG.exception(_("Could not release the acquired lock `%s`"), LOG.exception(_LE("Could not release the acquired lock `%s`"),
self.fname) self.fname)
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def exists(self):
return os.path.exists(self.fname)
def trylock(self): def trylock(self):
raise NotImplementedError() raise NotImplementedError()
@ -110,7 +133,7 @@ class _InterProcessLock(object):
raise NotImplementedError() raise NotImplementedError()
class _WindowsLock(_InterProcessLock): class _WindowsLock(_FileLock):
def trylock(self): def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
@ -118,7 +141,7 @@ class _WindowsLock(_InterProcessLock):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock): class _FcntlLock(_FileLock):
def trylock(self): def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
@ -126,17 +149,121 @@ class _PosixLock(_InterProcessLock):
fcntl.lockf(self.lockfile, fcntl.LOCK_UN) fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
class _PosixLock(object):
def __init__(self, name):
# Hash the name because it's not valid to have POSIX semaphore
# names with things like / in them. Then use base64 to encode
# the digest() instead taking the hexdigest() because the
# result is shorter and most systems can't have shm sempahore
# names longer than 31 characters.
h = hashlib.sha1()
h.update(name.encode('ascii'))
self.name = str((b'/' + base64.urlsafe_b64encode(
h.digest())).decode('ascii'))
def acquire(self, timeout=None):
self.semaphore = posix_ipc.Semaphore(self.name,
flags=posix_ipc.O_CREAT,
initial_value=1)
self.semaphore.acquire(timeout)
return self
def __enter__(self):
self.acquire()
return self
def release(self):
self.semaphore.release()
self.semaphore.close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
def exists(self):
try:
semaphore = posix_ipc.Semaphore(self.name)
except posix_ipc.ExistentialError:
return False
else:
semaphore.close()
return True
if os.name == 'nt': if os.name == 'nt':
import msvcrt import msvcrt
InterProcessLock = _WindowsLock InterProcessLock = _WindowsLock
FileLock = _WindowsLock
else: else:
import fcntl import base64
import hashlib
import posix_ipc
InterProcessLock = _PosixLock InterProcessLock = _PosixLock
FileLock = _FcntlLock
_semaphores = weakref.WeakValueDictionary() _semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock() _semaphores_lock = threading.Lock()
def _get_lock_path(name, lock_file_prefix, lock_path=None):
# NOTE(mikal): the lock name cannot contain directory
# separators
name = name.replace(os.sep, '_')
if lock_file_prefix:
sep = '' if lock_file_prefix.endswith('-') else '-'
name = '%s%s%s' % (lock_file_prefix, sep, name)
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
# NOTE(bnemec): Create a fake lock path for posix locks so we don't
# unnecessarily raise the RequiredOptError below.
if InterProcessLock is not _PosixLock:
raise cfg.RequiredOptError('lock_path')
local_lock_path = 'posixlock:/'
return os.path.join(local_lock_path, name)
def external_lock(name, lock_file_prefix=None, lock_path=None):
LOG.debug('Attempting to grab external lock "%(lock)s"',
{'lock': name})
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
# NOTE(bnemec): If an explicit lock_path was passed to us then it
# means the caller is relying on file-based locking behavior, so
# we can't use posix locks for those calls.
if lock_path:
return FileLock(lock_file_path)
return InterProcessLock(lock_file_path)
def remove_external_lock_file(name, lock_file_prefix=None):
"""Remove an external lock file when it's not used anymore
This will be helpful when we have a lot of lock files
"""
with internal_lock(name):
lock_file_path = _get_lock_path(name, lock_file_prefix)
try:
os.remove(lock_file_path)
except OSError:
LOG.info(_LI('Failed to remove file %(file)s'),
{'file': lock_file_path})
def internal_lock(name):
with _semaphores_lock:
try:
sem = _semaphores[name]
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
LOG.debug('Got semaphore "%(lock)s"', {'lock': name})
return sem
@contextlib.contextmanager @contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None): def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock """Context based lock
@ -150,69 +277,18 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None):
:param external: The external keyword argument denotes whether this lock :param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock', workers both run a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time. external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
""" """
with _semaphores_lock: int_lock = internal_lock(name)
try: with int_lock:
sem = _semaphores[name] if external and not CONF.disable_process_locking:
except KeyError: ext_lock = external_lock(name, lock_file_prefix, lock_path)
sem = threading.Semaphore() with ext_lock:
_semaphores[name] = sem yield ext_lock
else:
with sem: yield int_lock
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name}) LOG.debug('Released semaphore "%(lock)s"', {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
{'lock': name})
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
LOG.info(_('Created lock path: %s'), local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
else:
yield sem
finally:
local.strong_store.locks_held.remove(name)
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
@ -244,11 +320,11 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
def inner(*args, **kwargs): def inner(*args, **kwargs):
try: try:
with lock(name, lock_file_prefix, external, lock_path): with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'), LOG.debug('Got semaphore / lock "%(function)s"',
{'function': f.__name__}) {'function': f.__name__})
return f(*args, **kwargs) return f(*args, **kwargs)
finally: finally:
LOG.debug(_('Semaphore / lock released "%(function)s"'), LOG.debug('Semaphore / lock released "%(function)s"',
{'function': f.__name__}) {'function': f.__name__})
return inner return inner
return wrap return wrap

View File

@ -59,7 +59,10 @@ _SANITIZE_PATTERNS = []
_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])', _FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
r'(<%(key)s>).*?(</%(key)s>)', r'(<%(key)s>).*?(</%(key)s>)',
r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])', r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])'] r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])',
r'([\'"].*?%(key)s[\'"]\s*,\s*\'--?[A-z]+\'\s*,\s*u?[\'"])'
'.*?([\'"])',
r'(%(key)s\s*--?[A-z]+\s*).*?([\s])']
for key in _SANITIZE_KEYS: for key in _SANITIZE_KEYS:
for pattern in _FORMAT_PATTERNS: for pattern in _FORMAT_PATTERNS:
@ -89,7 +92,6 @@ logging_cli_opts = [
'files. For details about logging configuration files, ' 'files. For details about logging configuration files, '
'see the Python logging module documentation.'), 'see the Python logging module documentation.'),
cfg.StrOpt('log-format', cfg.StrOpt('log-format',
default=None,
metavar='FORMAT', metavar='FORMAT',
help='DEPRECATED. ' help='DEPRECATED. '
'A logging.Formatter log message format string which may ' 'A logging.Formatter log message format string which may '
@ -115,7 +117,7 @@ logging_cli_opts = [
default=False, default=False,
help='Use syslog for logging. ' help='Use syslog for logging. '
'Existing syslog format is DEPRECATED during I, ' 'Existing syslog format is DEPRECATED during I, '
'and will chang in J to honor RFC5424.'), 'and will change in J to honor RFC5424.'),
cfg.BoolOpt('use-syslog-rfc-format', cfg.BoolOpt('use-syslog-rfc-format',
# TODO(bogdando) remove or use True after existing # TODO(bogdando) remove or use True after existing
# syslog format deprecation in J # syslog format deprecation in J
@ -422,9 +424,7 @@ class JSONFormatter(logging.Formatter):
def _create_logging_excepthook(product_name): def _create_logging_excepthook(product_name):
def logging_excepthook(exc_type, value, tb): def logging_excepthook(exc_type, value, tb):
extra = {} extra = {'exc_info': (exc_type, value, tb)}
if CONF.verbose or CONF.debug:
extra['exc_info'] = (exc_type, value, tb)
getLogger(product_name).critical( getLogger(product_name).critical(
"".join(traceback.format_exception_only(exc_type, value)), "".join(traceback.format_exception_only(exc_type, value)),
**extra) **extra)
@ -462,9 +462,8 @@ def setup(product_name, version='unknown'):
def set_defaults(logging_context_format_string): def set_defaults(logging_context_format_string):
cfg.set_defaults(log_opts, cfg.set_defaults(
logging_context_format_string= log_opts, logging_context_format_string=logging_context_format_string)
logging_context_format_string)
def _find_facility_from_conf(): def _find_facility_from_conf():
@ -541,9 +540,14 @@ def _setup_logging_from_conf(project, version):
log_root.addHandler(streamlog) log_root.addHandler(streamlog)
if CONF.publish_errors: if CONF.publish_errors:
handler = importutils.import_object( try:
"glance.openstack.common.log_handler.PublishErrorsHandler", handler = importutils.import_object(
logging.ERROR) "glance.openstack.common.log_handler.PublishErrorsHandler",
logging.ERROR)
except ImportError:
handler = importutils.import_object(
"oslo.messaging.notify.log_handler.PublishErrorsHandler",
logging.ERROR)
log_root.addHandler(handler) log_root.addHandler(handler)
datefmt = CONF.log_date_format datefmt = CONF.log_date_format
@ -569,9 +573,15 @@ def _setup_logging_from_conf(project, version):
for pair in CONF.default_log_levels: for pair in CONF.default_log_levels:
mod, _sep, level_name = pair.partition('=') mod, _sep, level_name = pair.partition('=')
level = logging.getLevelName(level_name)
logger = logging.getLogger(mod) logger = logging.getLogger(mod)
logger.setLevel(level) # NOTE(AAzza) in python2.6 Logger.setLevel doesn't convert string name
# to integer code.
if sys.version_info < (2, 7):
level = logging.getLevelName(level_name)
logger.setLevel(level)
else:
logger.setLevel(level_name)
_loggers = {} _loggers = {}
@ -660,14 +670,19 @@ class ContextFormatter(logging.Formatter):
record.__dict__[key] = '' record.__dict__[key] = ''
if record.__dict__.get('request_id'): if record.__dict__.get('request_id'):
self._fmt = CONF.logging_context_format_string fmt = CONF.logging_context_format_string
else: else:
self._fmt = CONF.logging_default_format_string fmt = CONF.logging_default_format_string
if (record.levelno == logging.DEBUG and if (record.levelno == logging.DEBUG and
CONF.logging_debug_format_suffix): CONF.logging_debug_format_suffix):
self._fmt += " " + CONF.logging_debug_format_suffix fmt += " " + CONF.logging_debug_format_suffix
if sys.version_info < (3, 2):
self._fmt = fmt
else:
self._style = logging.PercentStyle(fmt)
self._fmt = self._style._fmt
# Cache this on the record, Logger will respect our formatted copy # Cache this on the record, Logger will respect our formatted copy
if record.exc_info: if record.exc_info:
record.exc_text = self.formatException(record.exc_info, record) record.exc_text = self.formatException(record.exc_info, record)

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation. # Copyright 2011 OpenStack Foundation.
# All Rights Reserved. # All Rights Reserved.
# #
@ -19,6 +17,9 @@
System-level utilities and helper functions. System-level utilities and helper functions.
""" """
import errno
import logging as stdlib_logging
import multiprocessing
import os import os
import random import random
import shlex import shlex
@ -26,6 +27,7 @@ import signal
from eventlet.green import subprocess from eventlet.green import subprocess
from eventlet import greenthread from eventlet import greenthread
import six
from glance.openstack.common.gettextutils import _ from glance.openstack.common.gettextutils import _
from glance.openstack.common import log as logging from glance.openstack.common import log as logging
@ -54,11 +56,18 @@ class ProcessExecutionError(Exception):
self.description = description self.description = description
if description is None: if description is None:
description = "Unexpected error while running command." description = _("Unexpected error while running command.")
if exit_code is None: if exit_code is None:
exit_code = '-' exit_code = '-'
message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" message = _('%(description)s\n'
% (description, cmd, exit_code, stdout, stderr)) 'Command: %(cmd)s\n'
'Exit code: %(exit_code)s\n'
'Stdout: %(stdout)r\n'
'Stderr: %(stderr)r') % {'description': description,
'cmd': cmd,
'exit_code': exit_code,
'stdout': stdout,
'stderr': stderr}
super(ProcessExecutionError, self).__init__(message) super(ProcessExecutionError, self).__init__(message)
@ -74,14 +83,17 @@ def _subprocess_setup():
def execute(*cmd, **kwargs): def execute(*cmd, **kwargs):
""" """Helper method to shell out and execute a command through subprocess.
Helper method to shell out and execute a command through subprocess with
optional retry. Allows optional retry.
:param cmd: Passed to subprocess.Popen. :param cmd: Passed to subprocess.Popen.
:type cmd: string :type cmd: string
:param process_input: Send to opened process. :param process_input: Send to opened process.
:type process_input: string :type process_input: string
:param env_variables: Environment variables and their values that
will be set for the process.
:type env_variables: dict
:param check_exit_code: Single bool, int, or list of allowed exit :param check_exit_code: Single bool, int, or list of allowed exit
codes. Defaults to [0]. Raise codes. Defaults to [0]. Raise
:class:`ProcessExecutionError` unless :class:`ProcessExecutionError` unless
@ -102,6 +114,9 @@ def execute(*cmd, **kwargs):
:param shell: whether or not there should be a shell used to :param shell: whether or not there should be a shell used to
execute this command. Defaults to false. execute this command. Defaults to false.
:type shell: boolean :type shell: boolean
:param loglevel: log level for execute commands.
:type loglevel: int. (Should be stdlib_logging.DEBUG or
stdlib_logging.INFO)
:returns: (stdout, stderr) from process execution :returns: (stdout, stderr) from process execution
:raises: :class:`UnknownArgumentError` on :raises: :class:`UnknownArgumentError` on
receiving unknown arguments receiving unknown arguments
@ -109,6 +124,7 @@ def execute(*cmd, **kwargs):
""" """
process_input = kwargs.pop('process_input', None) process_input = kwargs.pop('process_input', None)
env_variables = kwargs.pop('env_variables', None)
check_exit_code = kwargs.pop('check_exit_code', [0]) check_exit_code = kwargs.pop('check_exit_code', [0])
ignore_exit_code = False ignore_exit_code = False
delay_on_retry = kwargs.pop('delay_on_retry', True) delay_on_retry = kwargs.pop('delay_on_retry', True)
@ -116,6 +132,7 @@ def execute(*cmd, **kwargs):
run_as_root = kwargs.pop('run_as_root', False) run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '') root_helper = kwargs.pop('root_helper', '')
shell = kwargs.pop('shell', False) shell = kwargs.pop('shell', False)
loglevel = kwargs.pop('loglevel', stdlib_logging.DEBUG)
if isinstance(check_exit_code, bool): if isinstance(check_exit_code, bool):
ignore_exit_code = not check_exit_code ignore_exit_code = not check_exit_code
@ -127,11 +144,11 @@ def execute(*cmd, **kwargs):
raise UnknownArgumentError(_('Got unknown keyword args ' raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs) 'to utils.execute: %r') % kwargs)
if run_as_root and os.geteuid() != 0: if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0:
if not root_helper: if not root_helper:
raise NoRootWrapSpecified( raise NoRootWrapSpecified(
message=('Command requested root, but did not specify a root ' message=_('Command requested root, but did not '
'helper.')) 'specify a root helper.'))
cmd = shlex.split(root_helper) + list(cmd) cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd) cmd = map(str, cmd)
@ -139,7 +156,8 @@ def execute(*cmd, **kwargs):
while attempts > 0: while attempts > 0:
attempts -= 1 attempts -= 1
try: try:
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd)) LOG.log(loglevel, 'Running cmd (subprocess): %s',
' '.join(logging.mask_password(cmd)))
_PIPE = subprocess.PIPE # pylint: disable=E1101 _PIPE = subprocess.PIPE # pylint: disable=E1101
if os.name == 'nt': if os.name == 'nt':
@ -155,28 +173,37 @@ def execute(*cmd, **kwargs):
stderr=_PIPE, stderr=_PIPE,
close_fds=close_fds, close_fds=close_fds,
preexec_fn=preexec_fn, preexec_fn=preexec_fn,
shell=shell) shell=shell,
env=env_variables)
result = None result = None
if process_input is not None: for _i in six.moves.range(20):
result = obj.communicate(process_input) # NOTE(russellb) 20 is an arbitrary number of retries to
else: # prevent any chance of looping forever here.
result = obj.communicate() try:
if process_input is not None:
result = obj.communicate(process_input)
else:
result = obj.communicate()
except OSError as e:
if e.errno in (errno.EAGAIN, errno.EINTR):
continue
raise
break
obj.stdin.close() # pylint: disable=E1101 obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101 _returncode = obj.returncode # pylint: disable=E1101
if _returncode: LOG.log(loglevel, 'Result was %s' % _returncode)
LOG.debug(_('Result was %s') % _returncode) if not ignore_exit_code and _returncode not in check_exit_code:
if not ignore_exit_code and _returncode not in check_exit_code: (stdout, stderr) = result
(stdout, stderr) = result raise ProcessExecutionError(exit_code=_returncode,
raise ProcessExecutionError(exit_code=_returncode, stdout=stdout,
stdout=stdout, stderr=stderr,
stderr=stderr, cmd=' '.join(cmd))
cmd=' '.join(cmd))
return result return result
except ProcessExecutionError: except ProcessExecutionError:
if not attempts: if not attempts:
raise raise
else: else:
LOG.debug(_('%r failed. Retrying.'), cmd) LOG.log(loglevel, '%r failed. Retrying.', cmd)
if delay_on_retry: if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0) greenthread.sleep(random.randint(20, 200) / 100.0)
finally: finally:
@ -187,8 +214,7 @@ def execute(*cmd, **kwargs):
def trycmd(*args, **kwargs): def trycmd(*args, **kwargs):
""" """A wrapper around execute() to more easily handle warnings and errors.
A wrapper around execute() to more easily handle warnings and errors.
Returns an (out, err) tuple of strings containing the output of Returns an (out, err) tuple of strings containing the output of
the command's stdout and stderr. If 'err' is not empty then the the command's stdout and stderr. If 'err' is not empty then the
@ -203,8 +229,8 @@ def trycmd(*args, **kwargs):
try: try:
out, err = execute(*args, **kwargs) out, err = execute(*args, **kwargs)
failed = False failed = False
except ProcessExecutionError, exn: except ProcessExecutionError as exn:
out, err = '', str(exn) out, err = '', six.text_type(exn)
failed = True failed = True
if not failed and discard_warnings and err: if not failed and discard_warnings and err:
@ -216,7 +242,7 @@ def trycmd(*args, **kwargs):
def ssh_execute(ssh, cmd, process_input=None, def ssh_execute(ssh, cmd, process_input=None,
addl_env=None, check_exit_code=True): addl_env=None, check_exit_code=True):
LOG.debug(_('Running cmd (SSH): %s'), cmd) LOG.debug('Running cmd (SSH): %s', cmd)
if addl_env: if addl_env:
raise InvalidArgumentError(_('Environment not supported over SSH')) raise InvalidArgumentError(_('Environment not supported over SSH'))
@ -237,7 +263,7 @@ def ssh_execute(ssh, cmd, process_input=None,
# exit_status == -1 if no exit code was returned # exit_status == -1 if no exit code was returned
if exit_status != -1: if exit_status != -1:
LOG.debug(_('Result was %s') % exit_status) LOG.debug('Result was %s' % exit_status)
if check_exit_code and exit_status != 0: if check_exit_code and exit_status != 0:
raise ProcessExecutionError(exit_code=exit_status, raise ProcessExecutionError(exit_code=exit_status,
stdout=stdout, stdout=stdout,
@ -245,3 +271,15 @@ def ssh_execute(ssh, cmd, process_input=None,
cmd=cmd) cmd=cmd)
return (stdout, stderr) return (stdout, stderr)
def get_worker_count():
"""Utility to get the default worker count.
@return: The number of CPUs if that can be determined, else a default
worker count of 1 is returned.
"""
try:
return multiprocessing.cpu_count()
except NotImplementedError:
return 1

View File

@ -23,6 +23,8 @@ ordereddict
oslo.config>=1.2.1 oslo.config>=1.2.1
stevedore>=0.14 stevedore>=0.14
netaddr>=0.7.6 netaddr>=0.7.6
# For openstack/common/lockutils
posix_ipc
# For Swift storage backend. # For Swift storage backend.
python-swiftclient>=2.0.2 python-swiftclient>=2.0.2