From dde2cbafd33b69024e36c741402c8c130ab761e8 Mon Sep 17 00:00:00 2001 From: Matt Riedemann Date: Wed, 25 Jun 2014 11:03:29 -0700 Subject: [PATCH] Sync processutils and lockutils from oslo with deps This is a sync of processutils and lockutils from oslo-incubator along with their dependencies. The main target for the sync is to pick up the get_worker_count() method in commit 85f178489a128a04a7ee3ed44018403caa109ef0 so that we can set glance-api workers equal to the number of CPUs on the host. lockutils is also copied over due to the nature running multiple workers and how the glance functional tests are setup, it'd be prudent to have the latest lockutils code since it hasn't been updated in over six months. The change to require posix_ipc is due to commit edce46cf5efd6d738d379205f9bf526a498845e3 in lockutils. Changes: processutils ------------ 85f1784 Move nova.utils.cpu_count() to processutils module cdcc19c Mask passwords that are included in commands 8a0f567 Remove str() from LOG.* and exceptions 51778f9 Allow passing environment variables to execute() fcf517d Update oslo log messages with translation domains af41592 Catch OSError in processutils f773ea2 Fix i18n problem in processutils module 8b2b0b7 Use hacking import_exceptions for gettextutils._ 3b71f46 Fixed misspellings of common words 12bcdb7 Remove vim header a4dab73 Correct execute() to check 0 in check_exit_code d6a963e Fix processutils.execute errors on windows aa5b658 Allow passing a logging level to processutils.execute 1a2df89 Enable H302 hacking check 7119e29 Enable hacking H404 test. 2f01388 Use Python 3.x compatible except construct lockutils --------- de4adbc pep8: fixed multiple violations 9e88af1 fixed typos found by RETF rules fe3389e Improve help strings f3f14c9 Fixed several typos 0f495ee Emit a log statement when releasing internal lock f0dd798 Remove rendundant parentheses of cfg help strings 006d9a2 Allow external locks to work with threads 9b73c19 Re-enable file-based locking behavior edce46c Use Posix IPC in lockutils ac84a40 Update log translation domains fcf517d Update oslo log messages with translation domains 37a1de8 Move the released file lock to the successful path b0d0c33 Add remove external lock files API in lockutils 4f6190a Use threading.ThreadError instead of reraising IOError 195f0b1 Have the interprocess lock follow lock conventions 15cca4b lockutils: move directory creation in lock class 81fe39e lockutils: remove lock_path parameter 14d3669 lockutils: expand add_prefix dc06d55 lockutils: do not grab the lock in creators a0948cb lockutils: remove local usage df8e051 lockutils: split code handling internal/external lock log --- 109e325 Use oslo.messaging to publish log errors de4adbc pep8: fixed multiple violations eac71f5 Fix common.log.ContextFormatter for Python 3 d78b633 Fixes a simple spelling mistake 621d831 always log a traceback in the sys.excepthook 90ae24b Remove redundant default=None for config options af36c2a Fix logging setup for Python 3.4 cdcc19c Mask passwords that are included in commands excutils -------- 33a2cee save_and_reraise_exception: make logging respect the reraise parameter fcf517d Update oslo log messages with translation domains fileutils --------- 9c88dc3 file_open: fixed docstring to refer to open() instead of file() 6c7407b fileutils: port to Python 3 fcf517d Update oslo log messages with translation domains Partial-Bug: #1333325 Change-Id: I6c9d8961af2bd3bbe4d149f21cd6d4fad676ba14 --- glance/openstack/common/excutils.py | 32 +++- glance/openstack/common/fileutils.py | 8 +- glance/openstack/common/lockutils.py | 230 ++++++++++++++++-------- glance/openstack/common/log.py | 49 +++-- glance/openstack/common/processutils.py | 102 +++++++---- requirements.txt | 2 + 6 files changed, 283 insertions(+), 140 deletions(-) diff --git a/glance/openstack/common/excutils.py b/glance/openstack/common/excutils.py index 44246bcf9d..e6d8314859 100644 --- a/glance/openstack/common/excutils.py +++ b/glance/openstack/common/excutils.py @@ -24,7 +24,7 @@ import traceback import six -from glance.openstack.common.gettextutils import _ +from glance.openstack.common.gettextutils import _LE class save_and_reraise_exception(object): @@ -49,9 +49,22 @@ class save_and_reraise_exception(object): decide_if_need_reraise() if not should_be_reraised: ctxt.reraise = False + + If another exception occurs and reraise flag is False, + the saved exception will not be logged. + + If the caller wants to raise new exception during exception handling + he/she sets reraise to False initially with an ability to set it back to + True if needed:: + + except Exception: + with save_and_reraise_exception(reraise=False) as ctxt: + [if statements to determine whether to raise a new exception] + # Not raising a new exception, so reraise + ctxt.reraise = True """ - def __init__(self): - self.reraise = True + def __init__(self, reraise=True): + self.reraise = reraise def __enter__(self): self.type_, self.value, self.tb, = sys.exc_info() @@ -59,10 +72,11 @@ class save_and_reraise_exception(object): def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None: - logging.error(_('Original exception being dropped: %s'), - traceback.format_exception(self.type_, - self.value, - self.tb)) + if self.reraise: + logging.error(_LE('Original exception being dropped: %s'), + traceback.format_exception(self.type_, + self.value, + self.tb)) return False if self.reraise: six.reraise(self.type_, self.value, self.tb) @@ -88,8 +102,8 @@ def forever_retry_uncaught_exceptions(infunc): if (cur_time - last_log_time > 60 or this_exc_message != last_exc_message): logging.exception( - _('Unexpected exception occurred %d time(s)... ' - 'retrying.') % exc_count) + _LE('Unexpected exception occurred %d time(s)... ' + 'retrying.') % exc_count) last_log_time = cur_time last_exc_message = this_exc_message exc_count = 0 diff --git a/glance/openstack/common/fileutils.py b/glance/openstack/common/fileutils.py index e1746e4e13..dc3228fa9e 100644 --- a/glance/openstack/common/fileutils.py +++ b/glance/openstack/common/fileutils.py @@ -13,14 +13,12 @@ # License for the specific language governing permissions and limitations # under the License. - import contextlib import errno import os import tempfile from glance.openstack.common import excutils -from glance.openstack.common.gettextutils import _ from glance.openstack.common import log as logging LOG = logging.getLogger(__name__) @@ -60,7 +58,7 @@ def read_cached_file(filename, force_reload=False): cache_info = _FILE_CACHE.setdefault(filename, {}) if not cache_info or mtime > cache_info.get('mtime', 0): - LOG.debug(_("Reloading cached file %s") % filename) + LOG.debug("Reloading cached file %s" % filename) with open(filename) as fap: cache_info['data'] = fap.read() cache_info['mtime'] = mtime @@ -101,13 +99,13 @@ def remove_path_on_error(path, remove=delete_if_exists): def file_open(*args, **kwargs): """Open file - see built-in file() documentation for more details + see built-in open() documentation for more details Note: The reason this is kept in a separate module is to easily be able to provide a stub module that doesn't alter system state at all (for unit tests) """ - return file(*args, **kwargs) + return open(*args, **kwargs) def write_to_tempfile(content, path=None, suffix='', prefix='tmp'): diff --git a/glance/openstack/common/lockutils.py b/glance/openstack/common/lockutils.py index d5aa3ce30c..60e49f5e96 100644 --- a/glance/openstack/common/lockutils.py +++ b/glance/openstack/common/lockutils.py @@ -13,9 +13,9 @@ # License for the specific language governing permissions and limitations # under the License. - import contextlib import errno +import fcntl import functools import os import shutil @@ -29,8 +29,7 @@ import weakref from oslo.config import cfg from glance.openstack.common import fileutils -from glance.openstack.common.gettextutils import _ -from glance.openstack.common import local +from glance.openstack.common.gettextutils import _, _LE, _LI from glance.openstack.common import log as logging @@ -39,10 +38,10 @@ LOG = logging.getLogger(__name__) util_opts = [ cfg.BoolOpt('disable_process_locking', default=False, - help='Whether to disable inter-process locks'), + help='Enables or disables inter-process locks.'), cfg.StrOpt('lock_path', default=os.environ.get("GLANCE_LOCK_PATH"), - help=('Directory to use for lock files.')) + help='Directory to use for lock files.') ] @@ -54,7 +53,7 @@ def set_defaults(lock_path): cfg.set_defaults(util_opts, lock_path=lock_path) -class _InterProcessLock(object): +class _FileLock(object): """Lock implementation which allows multiple locks, working around issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does not require any cleanup. Since the lock is always held on a file @@ -76,7 +75,13 @@ class _InterProcessLock(object): self.lockfile = None self.fname = name - def __enter__(self): + def acquire(self): + basedir = os.path.dirname(self.fname) + + if not os.path.exists(basedir): + fileutils.ensure_tree(basedir) + LOG.info(_LI('Created lock path: %s'), basedir) + self.lockfile = open(self.fname, 'w') while True: @@ -86,23 +91,41 @@ class _InterProcessLock(object): # Also upon reading the MSDN docs for locking(), it seems # to have a laughable 10 attempts "blocking" mechanism. self.trylock() - return self + LOG.debug('Got file lock "%s"', self.fname) + return True except IOError as e: if e.errno in (errno.EACCES, errno.EAGAIN): # external locks synchronise things like iptables # updates - give it some time to prevent busy spinning time.sleep(0.01) else: - raise + raise threading.ThreadError(_("Unable to acquire lock on" + " `%(filename)s` due to" + " %(exception)s") % + { + 'filename': self.fname, + 'exception': e, + }) - def __exit__(self, exc_type, exc_val, exc_tb): + def __enter__(self): + self.acquire() + return self + + def release(self): try: self.unlock() self.lockfile.close() + LOG.debug('Released file lock "%s"', self.fname) except IOError: - LOG.exception(_("Could not release the acquired lock `%s`"), + LOG.exception(_LE("Could not release the acquired lock `%s`"), self.fname) + def __exit__(self, exc_type, exc_val, exc_tb): + self.release() + + def exists(self): + return os.path.exists(self.fname) + def trylock(self): raise NotImplementedError() @@ -110,7 +133,7 @@ class _InterProcessLock(object): raise NotImplementedError() -class _WindowsLock(_InterProcessLock): +class _WindowsLock(_FileLock): def trylock(self): msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) @@ -118,7 +141,7 @@ class _WindowsLock(_InterProcessLock): msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) -class _PosixLock(_InterProcessLock): +class _FcntlLock(_FileLock): def trylock(self): fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) @@ -126,17 +149,121 @@ class _PosixLock(_InterProcessLock): fcntl.lockf(self.lockfile, fcntl.LOCK_UN) +class _PosixLock(object): + def __init__(self, name): + # Hash the name because it's not valid to have POSIX semaphore + # names with things like / in them. Then use base64 to encode + # the digest() instead taking the hexdigest() because the + # result is shorter and most systems can't have shm sempahore + # names longer than 31 characters. + h = hashlib.sha1() + h.update(name.encode('ascii')) + self.name = str((b'/' + base64.urlsafe_b64encode( + h.digest())).decode('ascii')) + + def acquire(self, timeout=None): + self.semaphore = posix_ipc.Semaphore(self.name, + flags=posix_ipc.O_CREAT, + initial_value=1) + self.semaphore.acquire(timeout) + return self + + def __enter__(self): + self.acquire() + return self + + def release(self): + self.semaphore.release() + self.semaphore.close() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release() + + def exists(self): + try: + semaphore = posix_ipc.Semaphore(self.name) + except posix_ipc.ExistentialError: + return False + else: + semaphore.close() + return True + + if os.name == 'nt': import msvcrt InterProcessLock = _WindowsLock + FileLock = _WindowsLock else: - import fcntl + import base64 + import hashlib + + import posix_ipc InterProcessLock = _PosixLock + FileLock = _FcntlLock _semaphores = weakref.WeakValueDictionary() _semaphores_lock = threading.Lock() +def _get_lock_path(name, lock_file_prefix, lock_path=None): + # NOTE(mikal): the lock name cannot contain directory + # separators + name = name.replace(os.sep, '_') + if lock_file_prefix: + sep = '' if lock_file_prefix.endswith('-') else '-' + name = '%s%s%s' % (lock_file_prefix, sep, name) + + local_lock_path = lock_path or CONF.lock_path + + if not local_lock_path: + # NOTE(bnemec): Create a fake lock path for posix locks so we don't + # unnecessarily raise the RequiredOptError below. + if InterProcessLock is not _PosixLock: + raise cfg.RequiredOptError('lock_path') + local_lock_path = 'posixlock:/' + + return os.path.join(local_lock_path, name) + + +def external_lock(name, lock_file_prefix=None, lock_path=None): + LOG.debug('Attempting to grab external lock "%(lock)s"', + {'lock': name}) + + lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path) + + # NOTE(bnemec): If an explicit lock_path was passed to us then it + # means the caller is relying on file-based locking behavior, so + # we can't use posix locks for those calls. + if lock_path: + return FileLock(lock_file_path) + return InterProcessLock(lock_file_path) + + +def remove_external_lock_file(name, lock_file_prefix=None): + """Remove an external lock file when it's not used anymore + This will be helpful when we have a lot of lock files + """ + with internal_lock(name): + lock_file_path = _get_lock_path(name, lock_file_prefix) + try: + os.remove(lock_file_path) + except OSError: + LOG.info(_LI('Failed to remove file %(file)s'), + {'file': lock_file_path}) + + +def internal_lock(name): + with _semaphores_lock: + try: + sem = _semaphores[name] + except KeyError: + sem = threading.Semaphore() + _semaphores[name] = sem + + LOG.debug('Got semaphore "%(lock)s"', {'lock': name}) + return sem + + @contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None): """Context based lock @@ -150,69 +277,18 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None): :param external: The external keyword argument denotes whether this lock should work across multiple processes. This means that if two different - workers both run a a method decorated with @synchronized('mylock', + workers both run a method decorated with @synchronized('mylock', external=True), only one of them will execute at a time. - - :param lock_path: The lock_path keyword argument is used to specify a - special location for external lock files to live. If nothing is set, then - CONF.lock_path is used as a default. """ - with _semaphores_lock: - try: - sem = _semaphores[name] - except KeyError: - sem = threading.Semaphore() - _semaphores[name] = sem - - with sem: - LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name}) - - # NOTE(mikal): I know this looks odd - if not hasattr(local.strong_store, 'locks_held'): - local.strong_store.locks_held = [] - local.strong_store.locks_held.append(name) - - try: - if external and not CONF.disable_process_locking: - LOG.debug(_('Attempting to grab file lock "%(lock)s"'), - {'lock': name}) - - # We need a copy of lock_path because it is non-local - local_lock_path = lock_path or CONF.lock_path - if not local_lock_path: - raise cfg.RequiredOptError('lock_path') - - if not os.path.exists(local_lock_path): - fileutils.ensure_tree(local_lock_path) - LOG.info(_('Created lock path: %s'), local_lock_path) - - def add_prefix(name, prefix): - if not prefix: - return name - sep = '' if prefix.endswith('-') else '-' - return '%s%s%s' % (prefix, sep, name) - - # NOTE(mikal): the lock name cannot contain directory - # separators - lock_file_name = add_prefix(name.replace(os.sep, '_'), - lock_file_prefix) - - lock_file_path = os.path.join(local_lock_path, lock_file_name) - - try: - lock = InterProcessLock(lock_file_path) - with lock as lock: - LOG.debug(_('Got file lock "%(lock)s" at %(path)s'), - {'lock': name, 'path': lock_file_path}) - yield lock - finally: - LOG.debug(_('Released file lock "%(lock)s" at %(path)s'), - {'lock': name, 'path': lock_file_path}) - else: - yield sem - - finally: - local.strong_store.locks_held.remove(name) + int_lock = internal_lock(name) + with int_lock: + if external and not CONF.disable_process_locking: + ext_lock = external_lock(name, lock_file_prefix, lock_path) + with ext_lock: + yield ext_lock + else: + yield int_lock + LOG.debug('Released semaphore "%(lock)s"', {'lock': name}) def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): @@ -244,11 +320,11 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None): def inner(*args, **kwargs): try: with lock(name, lock_file_prefix, external, lock_path): - LOG.debug(_('Got semaphore / lock "%(function)s"'), + LOG.debug('Got semaphore / lock "%(function)s"', {'function': f.__name__}) return f(*args, **kwargs) finally: - LOG.debug(_('Semaphore / lock released "%(function)s"'), + LOG.debug('Semaphore / lock released "%(function)s"', {'function': f.__name__}) return inner return wrap diff --git a/glance/openstack/common/log.py b/glance/openstack/common/log.py index 247d033832..c31f3aa46a 100644 --- a/glance/openstack/common/log.py +++ b/glance/openstack/common/log.py @@ -59,7 +59,10 @@ _SANITIZE_PATTERNS = [] _FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])', r'(<%(key)s>).*?()', r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])', - r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])'] + r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])', + r'([\'"].*?%(key)s[\'"]\s*,\s*\'--?[A-z]+\'\s*,\s*u?[\'"])' + '.*?([\'"])', + r'(%(key)s\s*--?[A-z]+\s*).*?([\s])'] for key in _SANITIZE_KEYS: for pattern in _FORMAT_PATTERNS: @@ -89,7 +92,6 @@ logging_cli_opts = [ 'files. For details about logging configuration files, ' 'see the Python logging module documentation.'), cfg.StrOpt('log-format', - default=None, metavar='FORMAT', help='DEPRECATED. ' 'A logging.Formatter log message format string which may ' @@ -115,7 +117,7 @@ logging_cli_opts = [ default=False, help='Use syslog for logging. ' 'Existing syslog format is DEPRECATED during I, ' - 'and will chang in J to honor RFC5424.'), + 'and will change in J to honor RFC5424.'), cfg.BoolOpt('use-syslog-rfc-format', # TODO(bogdando) remove or use True after existing # syslog format deprecation in J @@ -422,9 +424,7 @@ class JSONFormatter(logging.Formatter): def _create_logging_excepthook(product_name): def logging_excepthook(exc_type, value, tb): - extra = {} - if CONF.verbose or CONF.debug: - extra['exc_info'] = (exc_type, value, tb) + extra = {'exc_info': (exc_type, value, tb)} getLogger(product_name).critical( "".join(traceback.format_exception_only(exc_type, value)), **extra) @@ -462,9 +462,8 @@ def setup(product_name, version='unknown'): def set_defaults(logging_context_format_string): - cfg.set_defaults(log_opts, - logging_context_format_string= - logging_context_format_string) + cfg.set_defaults( + log_opts, logging_context_format_string=logging_context_format_string) def _find_facility_from_conf(): @@ -541,9 +540,14 @@ def _setup_logging_from_conf(project, version): log_root.addHandler(streamlog) if CONF.publish_errors: - handler = importutils.import_object( - "glance.openstack.common.log_handler.PublishErrorsHandler", - logging.ERROR) + try: + handler = importutils.import_object( + "glance.openstack.common.log_handler.PublishErrorsHandler", + logging.ERROR) + except ImportError: + handler = importutils.import_object( + "oslo.messaging.notify.log_handler.PublishErrorsHandler", + logging.ERROR) log_root.addHandler(handler) datefmt = CONF.log_date_format @@ -569,9 +573,15 @@ def _setup_logging_from_conf(project, version): for pair in CONF.default_log_levels: mod, _sep, level_name = pair.partition('=') - level = logging.getLevelName(level_name) logger = logging.getLogger(mod) - logger.setLevel(level) + # NOTE(AAzza) in python2.6 Logger.setLevel doesn't convert string name + # to integer code. + if sys.version_info < (2, 7): + level = logging.getLevelName(level_name) + logger.setLevel(level) + else: + logger.setLevel(level_name) + _loggers = {} @@ -660,14 +670,19 @@ class ContextFormatter(logging.Formatter): record.__dict__[key] = '' if record.__dict__.get('request_id'): - self._fmt = CONF.logging_context_format_string + fmt = CONF.logging_context_format_string else: - self._fmt = CONF.logging_default_format_string + fmt = CONF.logging_default_format_string if (record.levelno == logging.DEBUG and CONF.logging_debug_format_suffix): - self._fmt += " " + CONF.logging_debug_format_suffix + fmt += " " + CONF.logging_debug_format_suffix + if sys.version_info < (3, 2): + self._fmt = fmt + else: + self._style = logging.PercentStyle(fmt) + self._fmt = self._style._fmt # Cache this on the record, Logger will respect our formatted copy if record.exc_info: record.exc_text = self.formatException(record.exc_info, record) diff --git a/glance/openstack/common/processutils.py b/glance/openstack/common/processutils.py index 303a696f64..5bf03ccbef 100644 --- a/glance/openstack/common/processutils.py +++ b/glance/openstack/common/processutils.py @@ -1,5 +1,3 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - # Copyright 2011 OpenStack Foundation. # All Rights Reserved. # @@ -19,6 +17,9 @@ System-level utilities and helper functions. """ +import errno +import logging as stdlib_logging +import multiprocessing import os import random import shlex @@ -26,6 +27,7 @@ import signal from eventlet.green import subprocess from eventlet import greenthread +import six from glance.openstack.common.gettextutils import _ from glance.openstack.common import log as logging @@ -54,11 +56,18 @@ class ProcessExecutionError(Exception): self.description = description if description is None: - description = "Unexpected error while running command." + description = _("Unexpected error while running command.") if exit_code is None: exit_code = '-' - message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" - % (description, cmd, exit_code, stdout, stderr)) + message = _('%(description)s\n' + 'Command: %(cmd)s\n' + 'Exit code: %(exit_code)s\n' + 'Stdout: %(stdout)r\n' + 'Stderr: %(stderr)r') % {'description': description, + 'cmd': cmd, + 'exit_code': exit_code, + 'stdout': stdout, + 'stderr': stderr} super(ProcessExecutionError, self).__init__(message) @@ -74,14 +83,17 @@ def _subprocess_setup(): def execute(*cmd, **kwargs): - """ - Helper method to shell out and execute a command through subprocess with - optional retry. + """Helper method to shell out and execute a command through subprocess. + + Allows optional retry. :param cmd: Passed to subprocess.Popen. :type cmd: string :param process_input: Send to opened process. :type process_input: string + :param env_variables: Environment variables and their values that + will be set for the process. + :type env_variables: dict :param check_exit_code: Single bool, int, or list of allowed exit codes. Defaults to [0]. Raise :class:`ProcessExecutionError` unless @@ -102,6 +114,9 @@ def execute(*cmd, **kwargs): :param shell: whether or not there should be a shell used to execute this command. Defaults to false. :type shell: boolean + :param loglevel: log level for execute commands. + :type loglevel: int. (Should be stdlib_logging.DEBUG or + stdlib_logging.INFO) :returns: (stdout, stderr) from process execution :raises: :class:`UnknownArgumentError` on receiving unknown arguments @@ -109,6 +124,7 @@ def execute(*cmd, **kwargs): """ process_input = kwargs.pop('process_input', None) + env_variables = kwargs.pop('env_variables', None) check_exit_code = kwargs.pop('check_exit_code', [0]) ignore_exit_code = False delay_on_retry = kwargs.pop('delay_on_retry', True) @@ -116,6 +132,7 @@ def execute(*cmd, **kwargs): run_as_root = kwargs.pop('run_as_root', False) root_helper = kwargs.pop('root_helper', '') shell = kwargs.pop('shell', False) + loglevel = kwargs.pop('loglevel', stdlib_logging.DEBUG) if isinstance(check_exit_code, bool): ignore_exit_code = not check_exit_code @@ -127,11 +144,11 @@ def execute(*cmd, **kwargs): raise UnknownArgumentError(_('Got unknown keyword args ' 'to utils.execute: %r') % kwargs) - if run_as_root and os.geteuid() != 0: + if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0: if not root_helper: raise NoRootWrapSpecified( - message=('Command requested root, but did not specify a root ' - 'helper.')) + message=_('Command requested root, but did not ' + 'specify a root helper.')) cmd = shlex.split(root_helper) + list(cmd) cmd = map(str, cmd) @@ -139,7 +156,8 @@ def execute(*cmd, **kwargs): while attempts > 0: attempts -= 1 try: - LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd)) + LOG.log(loglevel, 'Running cmd (subprocess): %s', + ' '.join(logging.mask_password(cmd))) _PIPE = subprocess.PIPE # pylint: disable=E1101 if os.name == 'nt': @@ -155,28 +173,37 @@ def execute(*cmd, **kwargs): stderr=_PIPE, close_fds=close_fds, preexec_fn=preexec_fn, - shell=shell) + shell=shell, + env=env_variables) result = None - if process_input is not None: - result = obj.communicate(process_input) - else: - result = obj.communicate() + for _i in six.moves.range(20): + # NOTE(russellb) 20 is an arbitrary number of retries to + # prevent any chance of looping forever here. + try: + if process_input is not None: + result = obj.communicate(process_input) + else: + result = obj.communicate() + except OSError as e: + if e.errno in (errno.EAGAIN, errno.EINTR): + continue + raise + break obj.stdin.close() # pylint: disable=E1101 _returncode = obj.returncode # pylint: disable=E1101 - if _returncode: - LOG.debug(_('Result was %s') % _returncode) - if not ignore_exit_code and _returncode not in check_exit_code: - (stdout, stderr) = result - raise ProcessExecutionError(exit_code=_returncode, - stdout=stdout, - stderr=stderr, - cmd=' '.join(cmd)) + LOG.log(loglevel, 'Result was %s' % _returncode) + if not ignore_exit_code and _returncode not in check_exit_code: + (stdout, stderr) = result + raise ProcessExecutionError(exit_code=_returncode, + stdout=stdout, + stderr=stderr, + cmd=' '.join(cmd)) return result except ProcessExecutionError: if not attempts: raise else: - LOG.debug(_('%r failed. Retrying.'), cmd) + LOG.log(loglevel, '%r failed. Retrying.', cmd) if delay_on_retry: greenthread.sleep(random.randint(20, 200) / 100.0) finally: @@ -187,8 +214,7 @@ def execute(*cmd, **kwargs): def trycmd(*args, **kwargs): - """ - A wrapper around execute() to more easily handle warnings and errors. + """A wrapper around execute() to more easily handle warnings and errors. Returns an (out, err) tuple of strings containing the output of the command's stdout and stderr. If 'err' is not empty then the @@ -203,8 +229,8 @@ def trycmd(*args, **kwargs): try: out, err = execute(*args, **kwargs) failed = False - except ProcessExecutionError, exn: - out, err = '', str(exn) + except ProcessExecutionError as exn: + out, err = '', six.text_type(exn) failed = True if not failed and discard_warnings and err: @@ -216,7 +242,7 @@ def trycmd(*args, **kwargs): def ssh_execute(ssh, cmd, process_input=None, addl_env=None, check_exit_code=True): - LOG.debug(_('Running cmd (SSH): %s'), cmd) + LOG.debug('Running cmd (SSH): %s', cmd) if addl_env: raise InvalidArgumentError(_('Environment not supported over SSH')) @@ -237,7 +263,7 @@ def ssh_execute(ssh, cmd, process_input=None, # exit_status == -1 if no exit code was returned if exit_status != -1: - LOG.debug(_('Result was %s') % exit_status) + LOG.debug('Result was %s' % exit_status) if check_exit_code and exit_status != 0: raise ProcessExecutionError(exit_code=exit_status, stdout=stdout, @@ -245,3 +271,15 @@ def ssh_execute(ssh, cmd, process_input=None, cmd=cmd) return (stdout, stderr) + + +def get_worker_count(): + """Utility to get the default worker count. + + @return: The number of CPUs if that can be determined, else a default + worker count of 1 is returned. + """ + try: + return multiprocessing.cpu_count() + except NotImplementedError: + return 1 diff --git a/requirements.txt b/requirements.txt index 8f0ead5e39..cef9faf74b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,6 +23,8 @@ ordereddict oslo.config>=1.2.1 stevedore>=0.14 netaddr>=0.7.6 +# For openstack/common/lockutils +posix_ipc # For Swift storage backend. python-swiftclient>=2.0.2