Switch oslo-incubator to use oslo.utils and remove old modules

keystone client/middleware, nova all have tests that mock
utcnow when we introduce oslo.utils into those projects, we have
test failures because of references to copies of timeutils/utcnow
in for example memorycache. So the best way to deal with it is to
ensure that all code in oslo-incubator refer to oslo.utils. Added a
note in the deprecated code so when folks sync, they will be aware
of it.

Per the new process, we will not be keeping graduated files around
on the master branch of incubator.

Closes-Bug: #1354521
Closes-Bug: #1371713
Change-Id: If80e66c7dae1bfcda8e884c57b7086f34d026040
This commit is contained in:
Davanum Srinivas 2014-08-15 16:20:30 -04:00
parent f563cf4a1a
commit 6ff6b4b4a5
32 changed files with 31 additions and 2400 deletions

View File

@ -97,12 +97,6 @@ M:
S: Orphan
F: eventlet_backdoor.py
== excutils ==
M:
S: Obsolete
F: excutils.py
== fileutils ==
M: Zhongyue Luo <zhongyue.nah@intel.com>
@ -139,12 +133,6 @@ M: Zhongyue Luo <zhongyue.nah@intel.com>
S: Maintained
F: imageutils.py
== importutils ==
M:
S: Obsolete
F: importutils.py
== jsonutils ==
M:
@ -194,12 +182,6 @@ M: Gordon Chung <gord@live.ca>
S: Obsolete
F: middleware/
== network_utils ==
M:
S: Orphan
F: network_utils.py
== periodic_task ==
M: Michael Still <mikal@stillhq.com>
@ -250,12 +232,6 @@ M:
S: Orphan
F: sslutils.py
== strutils ==
M: Flavio Percoco <flavio@redhat.com>
S: Obsolete
F: strutils.py
== systemd ==
M: Alan Pevec <apevec@redhat.com>
@ -274,18 +250,6 @@ M:
S: Orphan
F: threadgroup.py
== timeutils ==
M: Zhongyue Luo <zhongyue.nah@intel.com>
S: Obsolete
F: timeutils.py
== units ==
M: ChangBo Guo <glongwave@gmail.com>
S: Obsolete
F: units.py
== uuidutils ==
M: Zhongyue Luo <zhongyue.nah@intel.com>

View File

@ -26,12 +26,12 @@ Base utilities to build API operation managers and objects on top of.
import abc
import copy
from oslo.utils import strutils
import six
from six.moves.urllib import parse
from openstack.common.apiclient import exceptions
from openstack.common.gettextutils import _
from openstack.common import strutils
def getid(obj):

View File

@ -33,11 +33,11 @@ try:
except ImportError:
import json
from oslo.utils import importutils
import requests
from openstack.common.apiclient import exceptions
from openstack.common.gettextutils import _
from openstack.common import importutils
_logger = logging.getLogger(__name__)

View File

@ -14,9 +14,10 @@
import collections
from oslo.utils import timeutils
from openstack.common.cache import backends
from openstack.common import lockutils
from openstack.common import timeutils
class MemoryBackend(backends.BaseCache):

View File

@ -24,13 +24,14 @@ import os
import sys
import textwrap
from oslo.utils import encodeutils
from oslo.utils import strutils
import prettytable
import six
from six import moves
from openstack.common.apiclient import exceptions
from openstack.common.gettextutils import _
from openstack.common import strutils
from openstack.common import uuidutils
@ -173,7 +174,7 @@ def print_list(objs, fields, formatters=None, sortby_index=0,
row.append(data)
pt.add_row(row)
print(strutils.safe_encode(pt.get_string(**kwargs)))
print(encodeutils.safe_encode(pt.get_string(**kwargs)))
def print_dict(dct, dict_property="Property", wrap=0):
@ -201,7 +202,7 @@ def print_dict(dct, dict_property="Property", wrap=0):
col1 = ''
else:
pt.add_row([k, v])
print(strutils.safe_encode(pt.get_string()))
print(encodeutils.safe_encode(pt.get_string()))
def get_password(max_password_prompts=3):
@ -246,9 +247,9 @@ def find_resource(manager, name_or_id, **find_args):
# now try to get entity as uuid
try:
if six.PY2:
tmp_id = strutils.safe_encode(name_or_id)
tmp_id = encodeutils.safe_encode(name_or_id)
else:
tmp_id = strutils.safe_decode(name_or_id)
tmp_id = encodeutils.safe_decode(name_or_id)
if uuidutils.is_uuid_like(tmp_id):
return manager.get(tmp_id)

View File

@ -28,11 +28,11 @@ import sys
import textwrap
from oslo.config import cfg
from oslo.utils import importutils
import six
import stevedore.named
from openstack.common import gettextutils
from openstack.common import importutils
gettextutils.install('oslo')

View File

@ -16,10 +16,10 @@ import base64
from Crypto.Hash import HMAC
from Crypto import Random
from oslo.utils import importutils
import six
from openstack.common.gettextutils import _
from openstack.common import importutils
bchr = six.int2byte

View File

@ -1,113 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Exception related utilities.
"""
import logging
import sys
import time
import traceback
import six
from openstack.common.gettextutils import _LE
class save_and_reraise_exception(object):
"""Save current exception, run some code and then re-raise.
In some cases the exception context can be cleared, resulting in None
being attempted to be re-raised after an exception handler is run. This
can happen when eventlet switches greenthreads or when running an
exception handler, code raises and catches an exception. In both
cases the exception context will be cleared.
To work around this, we save the exception state, run handler code, and
then re-raise the original exception. If another exception occurs, the
saved exception is logged and the new exception is re-raised.
In some cases the caller may not want to re-raise the exception, and
for those circumstances this context provides a reraise flag that
can be used to suppress the exception. For example::
except Exception:
with save_and_reraise_exception() as ctxt:
decide_if_need_reraise()
if not should_be_reraised:
ctxt.reraise = False
If another exception occurs and reraise flag is False,
the saved exception will not be logged.
If the caller wants to raise new exception during exception handling
he/she sets reraise to False initially with an ability to set it back to
True if needed::
except Exception:
with save_and_reraise_exception(reraise=False) as ctxt:
[if statements to determine whether to raise a new exception]
# Not raising a new exception, so reraise
ctxt.reraise = True
"""
def __init__(self, reraise=True):
self.reraise = reraise
def __enter__(self):
self.type_, self.value, self.tb, = sys.exc_info()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
if self.reraise:
logging.error(_LE('Original exception being dropped: %s'),
traceback.format_exception(self.type_,
self.value,
self.tb))
return False
if self.reraise:
six.reraise(self.type_, self.value, self.tb)
def forever_retry_uncaught_exceptions(infunc):
def inner_func(*args, **kwargs):
last_log_time = 0
last_exc_message = None
exc_count = 0
while True:
try:
return infunc(*args, **kwargs)
except Exception as exc:
this_exc_message = six.u(str(exc))
if this_exc_message == last_exc_message:
exc_count += 1
else:
exc_count = 1
# Do not log any more frequently than once a minute unless
# the exception message changes
cur_time = int(time.time())
if (cur_time - last_log_time > 60 or
this_exc_message != last_exc_message):
logging.exception(
_LE('Unexpected exception occurred %d time(s)... '
'retrying.') % exc_count)
last_log_time = cur_time
last_exc_message = this_exc_message
exc_count = 0
# This should be a very rare event. In case it isn't, do
# a sleep.
time.sleep(1)
return inner_func

View File

@ -18,7 +18,8 @@ import errno
import os
import tempfile
from openstack.common import excutils
from oslo.utils import excutils
from openstack.common import log as logging
LOG = logging.getLogger(__name__)

View File

@ -21,8 +21,9 @@ Helper methods to deal with images.
import re
from oslo.utils import strutils
from openstack.common.gettextutils import _
from openstack.common import strutils
class QemuImgInfo(object):

View File

@ -1,73 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Import related utilities and helper functions.
"""
import sys
import traceback
def import_class(import_str):
"""Returns a class from a string including module and class."""
mod_str, _sep, class_str = import_str.rpartition('.')
__import__(mod_str)
try:
return getattr(sys.modules[mod_str], class_str)
except AttributeError:
raise ImportError('Class %s cannot be found (%s)' %
(class_str,
traceback.format_exception(*sys.exc_info())))
def import_object(import_str, *args, **kwargs):
"""Import a class and return an instance of it."""
return import_class(import_str)(*args, **kwargs)
def import_object_ns(name_space, import_str, *args, **kwargs):
"""Tries to import object from default namespace.
Imports a class and return an instance of it, first by trying
to find the class in a default namespace, then failing back to
a full path if not found in the default namespace.
"""
import_value = "%s.%s" % (name_space, import_str)
try:
return import_class(import_value)(*args, **kwargs)
except ImportError:
return import_class(import_str)(*args, **kwargs)
def import_module(import_str):
"""Import a module."""
__import__(import_str)
return sys.modules[import_str]
def import_versioned_module(version, submodule=None):
module = 'oslo.v%s' % version
if submodule:
module = '.'.join((module, submodule))
return import_module(module)
def try_import(import_str, default=None):
"""Try to import a module and if it fails return default."""
try:
return import_module(import_str)
except ImportError:
return default

View File

@ -56,13 +56,13 @@ if sys.version_info < (2, 7):
else:
import json
from oslo.utils import encodeutils
from oslo.utils import importutils
from oslo.utils import timeutils
import six
import six.moves.xmlrpc_client as xmlrpclib
from openstack.common import gettextutils
from openstack.common import importutils
from openstack.common import strutils
from openstack.common import timeutils
netaddr = importutils.try_import("netaddr")
@ -185,7 +185,7 @@ def dump(obj, fp, *args, **kwargs):
def loads(s, encoding='utf-8', **kwargs):
return json.loads(strutils.safe_decode(s, encoding), **kwargs)
return json.loads(encodeutils.safe_decode(s, encoding), **kwargs)
def load(fp, encoding='utf-8', **kwargs):

View File

@ -38,18 +38,15 @@ import sys
import traceback
from oslo.config import cfg
from oslo.utils import importutils
import six
from six import moves
_PY26 = sys.version_info[0:2] == (2, 6)
from openstack.common.gettextutils import _
from openstack.common import importutils
from openstack.common import jsonutils
from openstack.common import local
# NOTE(flaper87): Pls, remove when graduating this module
# from the incubator.
from openstack.common.strutils import mask_password # noqa
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

View File

@ -17,8 +17,7 @@
"""Super simple fake memcache client."""
from oslo.config import cfg
from openstack.common import timeutils
from oslo.utils import timeutils
memcache_opts = [
cfg.ListOpt('memcached_servers',

View File

@ -1,163 +0,0 @@
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Network-related utilities and helper functions.
"""
import logging
import socket
from six.moves.urllib import parse
from openstack.common.gettextutils import _LW
LOG = logging.getLogger(__name__)
def parse_host_port(address, default_port=None):
"""Interpret a string as a host:port pair.
An IPv6 address MUST be escaped if accompanied by a port,
because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
means both [2001:db8:85a3::8a2e:370:7334] and
[2001:db8:85a3::8a2e:370]:7334.
>>> parse_host_port('server01:80')
('server01', 80)
>>> parse_host_port('server01')
('server01', None)
>>> parse_host_port('server01', default_port=1234)
('server01', 1234)
>>> parse_host_port('[::1]:80')
('::1', 80)
>>> parse_host_port('[::1]')
('::1', None)
>>> parse_host_port('[::1]', default_port=1234)
('::1', 1234)
>>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
('2001:db8:85a3::8a2e:370:7334', 1234)
>>> parse_host_port(None)
(None, None)
"""
if not address:
return (None, None)
if address[0] == '[':
# Escaped ipv6
_host, _port = address[1:].split(']')
host = _host
if ':' in _port:
port = _port.split(':')[1]
else:
port = default_port
else:
if address.count(':') == 1:
host, port = address.split(':')
else:
# 0 means ipv4, >1 means ipv6.
# We prohibit unescaped ipv6 addresses with port.
host = address
port = default_port
return (host, None if port is None else int(port))
class ModifiedSplitResult(parse.SplitResult):
"""Split results class for urlsplit."""
# NOTE(dims): The functions below are needed for Python 2.6.x.
# We can remove these when we drop support for 2.6.x.
@property
def hostname(self):
netloc = self.netloc.split('@', 1)[-1]
host, port = parse_host_port(netloc)
return host
@property
def port(self):
netloc = self.netloc.split('@', 1)[-1]
host, port = parse_host_port(netloc)
return port
def urlsplit(url, scheme='', allow_fragments=True):
"""Parse a URL using urlparse.urlsplit(), splitting query and fragments.
This function papers over Python issue9374 when needed.
The parameters are the same as urlparse.urlsplit.
"""
scheme, netloc, path, query, fragment = parse.urlsplit(
url, scheme, allow_fragments)
if allow_fragments and '#' in path:
path, fragment = path.split('#', 1)
if '?' in path:
path, query = path.split('?', 1)
return ModifiedSplitResult(scheme, netloc,
path, query, fragment)
def set_tcp_keepalive(sock, tcp_keepalive=True,
tcp_keepidle=None,
tcp_keepalive_interval=None,
tcp_keepalive_count=None):
"""Set values for tcp keepalive parameters
This function configures tcp keepalive parameters if users wish to do
so.
:param tcp_keepalive: Boolean, turn on or off tcp_keepalive. If users are
not sure, this should be True, and default values will be used.
:param tcp_keepidle: time to wait before starting to send keepalive probes
:param tcp_keepalive_interval: time between successive probes, once the
initial wait time is over
:param tcp_keepalive_count: number of probes to send before the connection
is killed
"""
# NOTE(praneshp): Despite keepalive being a tcp concept, the level is
# still SOL_SOCKET. This is a quirk.
if isinstance(tcp_keepalive, bool):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, tcp_keepalive)
else:
raise TypeError("tcp_keepalive must be a boolean")
if not tcp_keepalive:
return
# These options aren't available in the OS X version of eventlet,
# Idle + Count * Interval effectively gives you the total timeout.
if tcp_keepidle is not None:
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
tcp_keepidle)
else:
LOG.warning(_LW('tcp_keepidle not available on your system'))
if tcp_keepalive_interval is not None:
if hasattr(socket, 'TCP_KEEPINTVL'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPINTVL,
tcp_keepalive_interval)
else:
LOG.warning(_LW('tcp_keepintvl not available on your system'))
if tcp_keepalive_count is not None:
if hasattr(socket, 'TCP_KEEPCNT'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPCNT,
tcp_keepalive_count)
else:
LOG.warning(_LW('tcp_keepknt not available on your system'))

View File

@ -27,10 +27,10 @@ import signal
from eventlet.green import subprocess
from eventlet import greenthread
from oslo.utils import strutils
import six
from openstack.common.gettextutils import _
from openstack.common import strutils
LOG = logging.getLogger(__name__)

View File

@ -19,12 +19,12 @@
import datetime
from oslo.config import cfg
from oslo.utils import importutils
from oslo.utils import timeutils
import six
from openstack.common.gettextutils import _, _LE
from openstack.common import importutils
from openstack.common import log as logging
from openstack.common import timeutils
LOG = logging.getLogger(__name__)

View File

@ -56,11 +56,12 @@ import os
import signal
import sys
from oslo.utils import timeutils
from openstack.common.report.generators import conf as cgen
from openstack.common.report.generators import threading as tgen
from openstack.common.report.generators import version as pgen
from openstack.common.report import report
from openstack.common import timeutils
class GuruMeditation(object):

View File

@ -15,7 +15,7 @@
import operator
from openstack.common import strutils
from oslo.utils import strutils
# 1. The following operations are supported:
# =, s==, s!=, s>=, s>, s<=, s<, <in>, <is>, <or>, ==, !=, >=, <=

View File

@ -1,316 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
System-level utilities and helper functions.
"""
import math
import re
import sys
import unicodedata
import six
from openstack.common.gettextutils import _
UNIT_PREFIX_EXPONENT = {
'k': 1,
'K': 1,
'Ki': 1,
'M': 2,
'Mi': 2,
'G': 3,
'Gi': 3,
'T': 4,
'Ti': 4,
}
UNIT_SYSTEM_INFO = {
'IEC': (1024, re.compile(r'(^[-+]?\d*\.?\d+)([KMGT]i?)?(b|bit|B)$')),
'SI': (1000, re.compile(r'(^[-+]?\d*\.?\d+)([kMGT])?(b|bit|B)$')),
}
TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes')
FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no')
SLUGIFY_STRIP_RE = re.compile(r"[^\w\s-]")
SLUGIFY_HYPHENATE_RE = re.compile(r"[-\s]+")
# NOTE(flaper87): The following globals are used by `mask_password`
_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password', 'admin_password']
# NOTE(ldbragst): Let's build a list of regex objects using the list of
# _SANITIZE_KEYS we already have. This way, we only have to add the new key
# to the list of _SANITIZE_KEYS and we can generate regular expressions
# for XML and JSON automatically.
_SANITIZE_PATTERNS_2 = []
_SANITIZE_PATTERNS_1 = []
# NOTE(amrith): Some regular expressions have only one parameter, some
# have two parameters. Use different lists of patterns here.
_FORMAT_PATTERNS_1 = [r'(%(key)s\s*[=]\s*)[^\s^\'^\"]+']
_FORMAT_PATTERNS_2 = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
r'(%(key)s\s+[\"\']).*?([\"\'])',
r'([-]{2}%(key)s\s+)[^\'^\"^=^\s]+([\s]*)',
r'(<%(key)s>).*?(</%(key)s>)',
r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])',
r'([\'"].*?%(key)s[\'"]\s*,\s*\'--?[A-z]+\'\s*,\s*u?'
'[\'"]).*?([\'"])',
r'(%(key)s\s*--?[A-z]+\s*)\S+(\s*)']
for key in _SANITIZE_KEYS:
for pattern in _FORMAT_PATTERNS_2:
reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
_SANITIZE_PATTERNS_2.append(reg_ex)
for pattern in _FORMAT_PATTERNS_1:
reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
_SANITIZE_PATTERNS_1.append(reg_ex)
def int_from_bool_as_string(subject):
"""Interpret a string as a boolean and return either 1 or 0.
Any string value in:
('True', 'true', 'On', 'on', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
"""
return bool_from_string(subject) and 1 or 0
def bool_from_string(subject, strict=False, default=False):
"""Interpret a string as a boolean.
A case-insensitive match is performed such that strings matching 't',
'true', 'on', 'y', 'yes', or '1' are considered True and, when
`strict=False`, anything else returns the value specified by 'default'.
Useful for JSON-decoded stuff and config file parsing.
If `strict=True`, unrecognized values, including None, will raise a
ValueError which is useful when parsing values passed in from an API call.
Strings yielding False are 'f', 'false', 'off', 'n', 'no', or '0'.
"""
if not isinstance(subject, six.string_types):
subject = six.text_type(subject)
lowered = subject.strip().lower()
if lowered in TRUE_STRINGS:
return True
elif lowered in FALSE_STRINGS:
return False
elif strict:
acceptable = ', '.join(
"'%s'" % s for s in sorted(TRUE_STRINGS + FALSE_STRINGS))
msg = _("Unrecognized value '%(val)s', acceptable values are:"
" %(acceptable)s") % {'val': subject,
'acceptable': acceptable}
raise ValueError(msg)
else:
return default
def safe_decode(text, incoming=None, errors='strict'):
"""Decodes incoming text/bytes string using `incoming` if they're not
already unicode.
:param incoming: Text's current encoding
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: text or a unicode `incoming` encoded
representation of it.
:raises TypeError: If text is not an instance of str
"""
if not isinstance(text, (six.string_types, six.binary_type)):
raise TypeError("%s can't be decoded" % type(text))
if isinstance(text, six.text_type):
return text
if not incoming:
incoming = (sys.stdin.encoding or
sys.getdefaultencoding())
try:
return text.decode(incoming, errors)
except UnicodeDecodeError:
# Note(flaper87) If we get here, it means that
# sys.stdin.encoding / sys.getdefaultencoding
# didn't return a suitable encoding to decode
# text. This happens mostly when global LANG
# var is not set correctly and there's no
# default encoding. In this case, most likely
# python will use ASCII or ANSI encoders as
# default encodings but they won't be capable
# of decoding non-ASCII characters.
#
# Also, UTF-8 is being used since it's an ASCII
# extension.
return text.decode('utf-8', errors)
def safe_encode(text, incoming=None,
encoding='utf-8', errors='strict'):
"""Encodes incoming text/bytes string using `encoding`.
If incoming is not specified, text is expected to be encoded with
current python's default encoding. (`sys.getdefaultencoding`)
:param incoming: Text's current encoding
:param encoding: Expected encoding for text (Default UTF-8)
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: text or a bytestring `encoding` encoded
representation of it.
:raises TypeError: If text is not an instance of str
"""
if not isinstance(text, (six.string_types, six.binary_type)):
raise TypeError("%s can't be encoded" % type(text))
if not incoming:
incoming = (sys.stdin.encoding or
sys.getdefaultencoding())
if isinstance(text, six.text_type):
return text.encode(encoding, errors)
elif text and encoding != incoming:
# Decode text before encoding it with `encoding`
text = safe_decode(text, incoming, errors)
return text.encode(encoding, errors)
else:
return text
def string_to_bytes(text, unit_system='IEC', return_int=False):
"""Converts a string into an float representation of bytes.
The units supported for IEC ::
Kb(it), Kib(it), Mb(it), Mib(it), Gb(it), Gib(it), Tb(it), Tib(it)
KB, KiB, MB, MiB, GB, GiB, TB, TiB
The units supported for SI ::
kb(it), Mb(it), Gb(it), Tb(it)
kB, MB, GB, TB
Note that the SI unit system does not support capital letter 'K'
:param text: String input for bytes size conversion.
:param unit_system: Unit system for byte size conversion.
:param return_int: If True, returns integer representation of text
in bytes. (default: decimal)
:returns: Numerical representation of text in bytes.
:raises ValueError: If text has an invalid value.
"""
try:
base, reg_ex = UNIT_SYSTEM_INFO[unit_system]
except KeyError:
msg = _('Invalid unit system: "%s"') % unit_system
raise ValueError(msg)
match = reg_ex.match(text)
if match:
magnitude = float(match.group(1))
unit_prefix = match.group(2)
if match.group(3) in ['b', 'bit']:
magnitude /= 8
else:
msg = _('Invalid string format: %s') % text
raise ValueError(msg)
if not unit_prefix:
res = magnitude
else:
res = magnitude * pow(base, UNIT_PREFIX_EXPONENT[unit_prefix])
if return_int:
return int(math.ceil(res))
return res
def to_slug(value, incoming=None, errors="strict"):
"""Normalize string.
Convert to lowercase, remove non-word characters, and convert spaces
to hyphens.
Inspired by Django's `slugify` filter.
:param value: Text to slugify
:param incoming: Text's current encoding
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: slugified unicode representation of `value`
:raises TypeError: If text is not an instance of str
"""
value = safe_decode(value, incoming, errors)
# NOTE(aababilov): no need to use safe_(encode|decode) here:
# encodings are always "ascii", error handling is always "ignore"
# and types are always known (first: unicode; second: str)
value = unicodedata.normalize("NFKD", value).encode(
"ascii", "ignore").decode("ascii")
value = SLUGIFY_STRIP_RE.sub("", value).strip().lower()
return SLUGIFY_HYPHENATE_RE.sub("-", value)
def mask_password(message, secret="***"):
"""Replace password with 'secret' in message.
:param message: The string which includes security information.
:param secret: value with which to replace passwords.
:returns: The unicode value of message with the password fields masked.
For example:
>>> mask_password("'adminPass' : 'aaaaa'")
"'adminPass' : '***'"
>>> mask_password("'admin_pass' : 'aaaaa'")
"'admin_pass' : '***'"
>>> mask_password('"password" : "aaaaa"')
'"password" : "***"'
>>> mask_password("'original_password' : 'aaaaa'")
"'original_password' : '***'"
>>> mask_password("u'original_password' : u'aaaaa'")
"u'original_password' : u'***'"
"""
try:
message = six.text_type(message)
except UnicodeDecodeError:
# NOTE(jecarey): Temporary fix to handle cases where message is a
# byte string. A better solution will be provided in Kilo.
pass
# NOTE(ldbragst): Check to see if anything in message contains any key
# specified in _SANITIZE_KEYS, if not then just return the message since
# we don't have to mask any passwords.
if not any(key in message for key in _SANITIZE_KEYS):
return message
substitute = r'\g<1>' + secret + r'\g<2>'
for pattern in _SANITIZE_PATTERNS_2:
message = re.sub(pattern, substitute, message)
substitute = r'\g<1>' + secret
for pattern in _SANITIZE_PATTERNS_1:
message = re.sub(pattern, substitute, message)
return message

View File

@ -1,210 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Time related utilities and helper functions.
"""
import calendar
import datetime
import time
import iso8601
import six
# ISO 8601 extended time format with microseconds
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format."""
if not at:
at = utcnow()
st = at.strftime(_ISO8601_TIME_FORMAT
if not subsecond
else _ISO8601_TIME_FORMAT_SUBSECOND)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
st += ('Z' if tz == 'UTC' else tz)
return st
def parse_isotime(timestr):
"""Parse time from ISO 8601 format."""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(six.text_type(e))
except TypeError as e:
raise ValueError(six.text_type(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
"""Returns formatted utcnow."""
if not at:
at = utcnow()
return at.strftime(fmt)
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
"""Turn a formatted time back into a datetime."""
return datetime.datetime.strptime(timestr, fmt)
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC naive object."""
offset = timestamp.utcoffset()
if offset is None:
return timestamp
return timestamp.replace(tzinfo=None) - offset
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
if isinstance(before, six.string_types):
before = parse_strtime(before).replace(tzinfo=None)
else:
before = before.replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
if isinstance(after, six.string_types):
after = parse_strtime(after).replace(tzinfo=None)
else:
after = after.replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
if utcnow.override_time is None:
# NOTE(kgriffs): This is several times faster
# than going through calendar.timegm(...)
return int(time.time())
return calendar.timegm(utcnow().timetuple())
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
return datetime.datetime.utcnow()
def iso8601_from_timestamp(timestamp):
"""Returns an iso8601 formatted date from timestamp."""
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
utcnow.override_time = None
def set_time_override(override_time=None):
"""Overrides utils.utcnow.
Make it return a constant time or a list thereof, one at a time.
:param override_time: datetime instance or list thereof. If not
given, defaults to the current UTC time.
"""
utcnow.override_time = override_time or datetime.datetime.utcnow()
def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta."""
assert utcnow.override_time is not None
try:
for dt in utcnow.override_time:
dt += timedelta
except TypeError:
utcnow.override_time += timedelta
def advance_time_seconds(seconds):
"""Advance overridden time by seconds."""
advance_time_delta(datetime.timedelta(0, seconds))
def clear_time_override():
"""Remove the overridden time."""
utcnow.override_time = None
def marshall_now(now=None):
"""Make an rpc-safe datetime with microseconds.
Note: tzinfo is stripped, but not required for relative times.
"""
if not now:
now = utcnow()
return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
minute=now.minute, second=now.second,
microsecond=now.microsecond)
def unmarshall_time(tyme):
"""Unmarshall a datetime dict."""
return datetime.datetime(day=tyme['day'],
month=tyme['month'],
year=tyme['year'],
hour=tyme['hour'],
minute=tyme['minute'],
second=tyme['second'],
microsecond=tyme['microsecond'])
def delta_seconds(before, after):
"""Return the difference between two timing objects.
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
return total_seconds(delta)
def total_seconds(delta):
"""Return the total seconds of datetime.timedelta object.
Compute total seconds of datetime.timedelta, datetime.timedelta
doesn't have method total_seconds in Python2.6, calculate it manually.
"""
try:
return delta.total_seconds()
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))
def is_soon(dt, window):
"""Determines if time is going to happen in the next window seconds.
:param dt: the time
:param window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) <= soon

View File

@ -1,38 +0,0 @@
# Copyright 2013 IBM Corp
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit constants
"""
# Binary unit constants.
Ki = 1024
Mi = 1024 ** 2
Gi = 1024 ** 3
Ti = 1024 ** 4
Pi = 1024 ** 5
Ei = 1024 ** 6
Zi = 1024 ** 7
Yi = 1024 ** 8
# Decimal unit constants.
k = 1000
M = 1000 ** 2
G = 1000 ** 3
T = 1000 ** 4
P = 1000 ** 5
E = 1000 ** 6
Z = 1000 ** 7
Y = 1000 ** 8

View File

@ -13,6 +13,7 @@ kombu>=2.4.8
lockfile>=0.8
netaddr>=0.7.12
oslo.config>=1.4.0 # Apache-2.0
oslo.utils>=1.0.0 # Apache-2.0
PasteDeploy>=1.5.0
pbr>=0.6,!=0.7,<1.0
posix_ipc

View File

@ -14,6 +14,7 @@ kombu>=2.4.8
lockfile>=0.8
netaddr>=0.7.12
oslo.config>=1.4.0 # Apache-2.0
oslo.utils>=1.0.0 # Apache-2.0
PasteDeploy>=1.5.0
posix_ipc
PrettyTable>=0.7,<0.8

View File

@ -150,7 +150,7 @@ class TestGuruMeditationReport(utils.BaseTestCase):
os.kill(os.getpid(), signal.SIGUSR1)
self.assertIn('Guru Meditation', sys.stderr.getvalue())
@mock.patch('openstack.common.timeutils.strtime', return_value="NOW")
@mock.patch('oslo.utils.timeutils.strtime', return_value="NOW")
def test_register_autorun_log_dir(self, mock_strtime):
log_dir = self.useFixture(fixtures.TempDir()).path
gmr.TextGuruMeditation.setup_autorun(

View File

@ -1,198 +0,0 @@
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import time
import mock
from oslotest import base as test_base
from oslotest import moxstubout
from openstack.common import excutils
mox = moxstubout.mox
class SaveAndReraiseTest(test_base.BaseTestCase):
def test_save_and_reraise_exception(self):
e = None
msg = 'foo'
try:
try:
raise Exception(msg)
except Exception:
with excutils.save_and_reraise_exception():
pass
except Exception as _e:
e = _e
self.assertEqual(str(e), msg)
def test_save_and_reraise_exception_dropped(self):
e = None
msg = 'second exception'
with mock.patch('logging.error') as log:
try:
try:
raise Exception('dropped')
except Exception:
with excutils.save_and_reraise_exception():
raise Exception(msg)
except Exception as _e:
e = _e
self.assertEqual(str(e), msg)
self.assertTrue(log.called)
def test_save_and_reraise_exception_no_reraise(self):
"""Test that suppressing the reraise works."""
try:
raise Exception('foo')
except Exception:
with excutils.save_and_reraise_exception() as ctxt:
ctxt.reraise = False
def test_save_and_reraise_exception_dropped_no_reraise(self):
e = None
msg = 'second exception'
with mock.patch('logging.error') as log:
try:
try:
raise Exception('dropped')
except Exception:
with excutils.save_and_reraise_exception(reraise=False):
raise Exception(msg)
except Exception as _e:
e = _e
self.assertEqual(str(e), msg)
self.assertFalse(log.called)
class ForeverRetryUncaughtExceptionsTest(test_base.BaseTestCase):
def setUp(self):
super(ForeverRetryUncaughtExceptionsTest, self).setUp()
moxfixture = self.useFixture(moxstubout.MoxStubout())
self.mox = moxfixture.mox
self.stubs = moxfixture.stubs
@excutils.forever_retry_uncaught_exceptions
def exception_generator(self):
exc = self.exception_to_raise()
while exc is not None:
raise exc
exc = self.exception_to_raise()
def exception_to_raise(self):
return None
def my_time_sleep(self, arg):
pass
def my_time_time(self, args):
pass
def exc_retrier_common_start(self):
self.stubs.Set(time, 'sleep', self.my_time_sleep)
self.mox.StubOutWithMock(logging, 'exception')
self.mox.StubOutWithMock(time, 'time', self.my_time_time)
self.mox.StubOutWithMock(self, 'exception_to_raise')
def exc_retrier_sequence(self, exc_id=None, timestamp=None,
exc_count=None):
self.exception_to_raise().AndReturn(
Exception('unexpected %d' % exc_id))
time.time().AndReturn(timestamp)
if exc_count != 0:
logging.exception(mox.In(
'Unexpected exception occurred %d time(s)' % exc_count))
def exc_retrier_common_end(self):
self.exception_to_raise().AndReturn(None)
self.mox.ReplayAll()
self.exception_generator()
self.addCleanup(self.stubs.UnsetAll)
def test_exc_retrier_1exc_gives_1log(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
self.exc_retrier_common_end()
def test_exc_retrier_same_10exc_1min_gives_1log(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
# By design, the following exception don't get logged because they
# are within the same minute.
for i in range(2, 11):
self.exc_retrier_sequence(exc_id=1, timestamp=i, exc_count=0)
self.exc_retrier_common_end()
def test_exc_retrier_same_2exc_2min_gives_2logs(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
self.exc_retrier_sequence(exc_id=1, timestamp=65, exc_count=1)
self.exc_retrier_common_end()
def test_exc_retrier_same_10exc_2min_gives_2logs(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
self.exc_retrier_sequence(exc_id=1, timestamp=12, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=23, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=34, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=45, exc_count=0)
# The previous 4 exceptions are counted here
self.exc_retrier_sequence(exc_id=1, timestamp=106, exc_count=5)
# Again, the following are not logged due to being within
# the same minute
self.exc_retrier_sequence(exc_id=1, timestamp=117, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=128, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=139, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=150, exc_count=0)
self.exc_retrier_common_end()
def test_exc_retrier_mixed_4exc_1min_gives_2logs(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
# By design, this second 'unexpected 1' exception is not counted. This
# is likely a rare thing and is a sacrifice for code simplicity.
self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
self.exc_retrier_sequence(exc_id=2, timestamp=20, exc_count=1)
# Again, trailing exceptions within a minute are not counted.
self.exc_retrier_sequence(exc_id=2, timestamp=30, exc_count=0)
self.exc_retrier_common_end()
def test_exc_retrier_mixed_4exc_2min_gives_2logs(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
# Again, this second exception of the same type is not counted
# for the sake of code simplicity.
self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
# The difference between this and the previous case is the log
# is also triggered by more than a minute expiring.
self.exc_retrier_sequence(exc_id=2, timestamp=100, exc_count=1)
self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=0)
self.exc_retrier_common_end()
def test_exc_retrier_mixed_4exc_2min_gives_3logs(self):
self.exc_retrier_common_start()
self.exc_retrier_sequence(exc_id=1, timestamp=1, exc_count=1)
# This time the second 'unexpected 1' exception is counted due
# to the same exception occurring same when the minute expires.
self.exc_retrier_sequence(exc_id=1, timestamp=10, exc_count=0)
self.exc_retrier_sequence(exc_id=1, timestamp=100, exc_count=2)
self.exc_retrier_sequence(exc_id=2, timestamp=110, exc_count=1)
self.exc_retrier_common_end()

View File

@ -1,118 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import sys
from oslotest import base as test_base
from openstack.common import importutils
class ImportUtilsTest(test_base.BaseTestCase):
# NOTE(jkoelker) There has GOT to be a way to test this. But mocking
# __import__ is the devil. Right now we just make
# sure we can import something from the stdlib
def test_import_class(self):
dt = importutils.import_class('datetime.datetime')
self.assertEqual(sys.modules['datetime'].datetime, dt)
def test_import_bad_class(self):
self.assertRaises(ImportError, importutils.import_class,
'lol.u_mad.brah')
def test_import_module(self):
dt = importutils.import_module('datetime')
self.assertEqual(sys.modules['datetime'], dt)
def test_import_object_optional_arg_not_present(self):
obj = importutils.import_object('tests.unit.fake.FakeDriver')
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_optional_arg_present(self):
obj = importutils.import_object('tests.unit.fake.FakeDriver',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_required_arg_not_present(self):
# arg 1 isn't optional here
self.assertRaises(TypeError, importutils.import_object,
'tests.unit.fake.FakeDriver2')
def test_import_object_required_arg_present(self):
obj = importutils.import_object('tests.unit.fake.FakeDriver2',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
# namespace tests
def test_import_object_ns_optional_arg_not_present(self):
obj = importutils.import_object_ns('tests.unit', 'fake.FakeDriver')
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_optional_arg_present(self):
obj = importutils.import_object_ns('tests.unit', 'fake.FakeDriver',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_required_arg_not_present(self):
# arg 1 isn't optional here
self.assertRaises(TypeError, importutils.import_object_ns,
'tests.unit', 'fake.FakeDriver2')
def test_import_object_ns_required_arg_present(self):
obj = importutils.import_object_ns('tests.unit', 'fake.FakeDriver2',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
# namespace tests
def test_import_object_ns_full_optional_arg_not_present(self):
obj = importutils.import_object_ns('tests.unit2',
'tests.unit.fake.FakeDriver')
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_full_optional_arg_present(self):
obj = importutils.import_object_ns('tests.unit2',
'tests.unit.fake.FakeDriver',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_ns_full_required_arg_not_present(self):
# arg 1 isn't optional here
self.assertRaises(TypeError, importutils.import_object_ns,
'tests.unit2', 'tests.unit.fake.FakeDriver2')
def test_import_object_ns_full_required_arg_present(self):
obj = importutils.import_object_ns('tests.unit2',
'tests.unit.fake.FakeDriver2',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
def test_import_object(self):
dt = importutils.import_object('datetime.time')
self.assertTrue(isinstance(dt, sys.modules['datetime'].time))
def test_import_object_with_args(self):
dt = importutils.import_object('datetime.datetime', 2012, 4, 5)
self.assertTrue(isinstance(dt, sys.modules['datetime'].datetime))
self.assertEqual(dt, datetime.datetime(2012, 4, 5))
def test_try_import(self):
dt = importutils.try_import('datetime')
self.assertEqual(sys.modules['datetime'], dt)
def test_try_import_returns_default(self):
foo = importutils.try_import('foo.bar')
self.assertIsNone(foo)

View File

@ -1,119 +0,0 @@
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import mock
from oslotest import base as test_base
from openstack.common import network_utils
class NetworkUtilsTest(test_base.BaseTestCase):
def test_no_host(self):
result = network_utils.urlsplit('http://')
self.assertEqual('', result.netloc)
self.assertEqual(None, result.port)
self.assertEqual(None, result.hostname)
self.assertEqual('http', result.scheme)
def test_parse_host_port(self):
self.assertEqual(('server01', 80),
network_utils.parse_host_port('server01:80'))
self.assertEqual(('server01', None),
network_utils.parse_host_port('server01'))
self.assertEqual(('server01', 1234),
network_utils.parse_host_port('server01',
default_port=1234))
self.assertEqual(('::1', 80),
network_utils.parse_host_port('[::1]:80'))
self.assertEqual(('::1', None),
network_utils.parse_host_port('[::1]'))
self.assertEqual(('::1', 1234),
network_utils.parse_host_port('[::1]',
default_port=1234))
self.assertEqual(('2001:db8:85a3::8a2e:370:7334', 1234),
network_utils.parse_host_port(
'2001:db8:85a3::8a2e:370:7334',
default_port=1234))
def test_urlsplit(self):
result = network_utils.urlsplit('rpc://myhost?someparam#somefragment')
self.assertEqual(result.scheme, 'rpc')
self.assertEqual(result.netloc, 'myhost')
self.assertEqual(result.path, '')
self.assertEqual(result.query, 'someparam')
self.assertEqual(result.fragment, 'somefragment')
result = network_utils.urlsplit(
'rpc://myhost/mypath?someparam#somefragment',
allow_fragments=False)
self.assertEqual(result.scheme, 'rpc')
self.assertEqual(result.netloc, 'myhost')
self.assertEqual(result.path, '/mypath')
self.assertEqual(result.query, 'someparam#somefragment')
self.assertEqual(result.fragment, '')
result = network_utils.urlsplit(
'rpc://user:pass@myhost/mypath?someparam#somefragment',
allow_fragments=False)
self.assertEqual(result.scheme, 'rpc')
self.assertEqual(result.netloc, 'user:pass@myhost')
self.assertEqual(result.path, '/mypath')
self.assertEqual(result.query, 'someparam#somefragment')
self.assertEqual(result.fragment, '')
def test_urlsplit_ipv6(self):
ipv6_url = 'http://[::1]:443/v2.0/'
result = network_utils.urlsplit(ipv6_url)
self.assertEqual(result.scheme, 'http')
self.assertEqual(result.netloc, '[::1]:443')
self.assertEqual(result.path, '/v2.0/')
self.assertEqual(result.hostname, '::1')
self.assertEqual(result.port, 443)
ipv6_url = 'http://user:pass@[::1]/v2.0/'
result = network_utils.urlsplit(ipv6_url)
self.assertEqual(result.scheme, 'http')
self.assertEqual(result.netloc, 'user:pass@[::1]')
self.assertEqual(result.path, '/v2.0/')
self.assertEqual(result.hostname, '::1')
self.assertEqual(result.port, None)
ipv6_url = 'https://[2001:db8:85a3::8a2e:370:7334]:1234/v2.0/xy?ab#12'
result = network_utils.urlsplit(ipv6_url)
self.assertEqual(result.scheme, 'https')
self.assertEqual(result.netloc, '[2001:db8:85a3::8a2e:370:7334]:1234')
self.assertEqual(result.path, '/v2.0/xy')
self.assertEqual(result.hostname, '2001:db8:85a3::8a2e:370:7334')
self.assertEqual(result.port, 1234)
self.assertEqual(result.query, 'ab')
self.assertEqual(result.fragment, '12')
def test_set_tcp_keepalive(self):
mock_sock = mock.Mock()
network_utils.set_tcp_keepalive(mock_sock, True, 100, 10, 5)
calls = [
mock.call.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True),
mock.call.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 100),
mock.call.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10),
mock.call.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
]
mock_sock.assert_has_calls(calls)
mock_sock.reset_mock()
network_utils.set_tcp_keepalive(mock_sock, False)
self.assertEqual(1, len(mock_sock.mock_calls))

View File

@ -1,596 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import math
import mock
from oslotest import base as test_base
import six
import testscenarios
from openstack.common import strutils
from openstack.common import units
load_tests = testscenarios.load_tests_apply_scenarios
class StrUtilsTest(test_base.BaseTestCase):
def test_bool_bool_from_string(self):
self.assertTrue(strutils.bool_from_string(True))
self.assertFalse(strutils.bool_from_string(False))
def test_bool_bool_from_string_default(self):
self.assertTrue(strutils.bool_from_string('', default=True))
self.assertFalse(strutils.bool_from_string('wibble', default=False))
def _test_bool_from_string(self, c):
self.assertTrue(strutils.bool_from_string(c('true')))
self.assertTrue(strutils.bool_from_string(c('TRUE')))
self.assertTrue(strutils.bool_from_string(c('on')))
self.assertTrue(strutils.bool_from_string(c('On')))
self.assertTrue(strutils.bool_from_string(c('yes')))
self.assertTrue(strutils.bool_from_string(c('YES')))
self.assertTrue(strutils.bool_from_string(c('yEs')))
self.assertTrue(strutils.bool_from_string(c('1')))
self.assertTrue(strutils.bool_from_string(c('T')))
self.assertTrue(strutils.bool_from_string(c('t')))
self.assertTrue(strutils.bool_from_string(c('Y')))
self.assertTrue(strutils.bool_from_string(c('y')))
self.assertFalse(strutils.bool_from_string(c('false')))
self.assertFalse(strutils.bool_from_string(c('FALSE')))
self.assertFalse(strutils.bool_from_string(c('off')))
self.assertFalse(strutils.bool_from_string(c('OFF')))
self.assertFalse(strutils.bool_from_string(c('no')))
self.assertFalse(strutils.bool_from_string(c('0')))
self.assertFalse(strutils.bool_from_string(c('42')))
self.assertFalse(strutils.bool_from_string(c(
'This should not be True')))
self.assertFalse(strutils.bool_from_string(c('F')))
self.assertFalse(strutils.bool_from_string(c('f')))
self.assertFalse(strutils.bool_from_string(c('N')))
self.assertFalse(strutils.bool_from_string(c('n')))
# Whitespace should be stripped
self.assertTrue(strutils.bool_from_string(c(' 1 ')))
self.assertTrue(strutils.bool_from_string(c(' true ')))
self.assertFalse(strutils.bool_from_string(c(' 0 ')))
self.assertFalse(strutils.bool_from_string(c(' false ')))
def test_bool_from_string(self):
self._test_bool_from_string(lambda s: s)
def test_unicode_bool_from_string(self):
self._test_bool_from_string(six.text_type)
self.assertFalse(strutils.bool_from_string(u'使用', strict=False))
exc = self.assertRaises(ValueError, strutils.bool_from_string,
u'使用', strict=True)
expected_msg = (u"Unrecognized value '使用', acceptable values are:"
u" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
u" 't', 'true', 'y', 'yes'")
self.assertEqual(expected_msg, six.text_type(exc))
def test_other_bool_from_string(self):
self.assertFalse(strutils.bool_from_string(None))
self.assertFalse(strutils.bool_from_string(mock.Mock()))
def test_int_bool_from_string(self):
self.assertTrue(strutils.bool_from_string(1))
self.assertFalse(strutils.bool_from_string(-1))
self.assertFalse(strutils.bool_from_string(0))
self.assertFalse(strutils.bool_from_string(2))
def test_strict_bool_from_string(self):
# None isn't allowed in strict mode
exc = self.assertRaises(ValueError, strutils.bool_from_string, None,
strict=True)
expected_msg = ("Unrecognized value 'None', acceptable values are:"
" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
" 't', 'true', 'y', 'yes'")
self.assertEqual(expected_msg, str(exc))
# Unrecognized strings aren't allowed
self.assertFalse(strutils.bool_from_string('Other', strict=False))
exc = self.assertRaises(ValueError, strutils.bool_from_string, 'Other',
strict=True)
expected_msg = ("Unrecognized value 'Other', acceptable values are:"
" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
" 't', 'true', 'y', 'yes'")
self.assertEqual(expected_msg, str(exc))
# Unrecognized numbers aren't allowed
exc = self.assertRaises(ValueError, strutils.bool_from_string, 2,
strict=True)
expected_msg = ("Unrecognized value '2', acceptable values are:"
" '0', '1', 'f', 'false', 'n', 'no', 'off', 'on',"
" 't', 'true', 'y', 'yes'")
self.assertEqual(expected_msg, str(exc))
# False-like values are allowed
self.assertFalse(strutils.bool_from_string('f', strict=True))
self.assertFalse(strutils.bool_from_string('false', strict=True))
self.assertFalse(strutils.bool_from_string('off', strict=True))
self.assertFalse(strutils.bool_from_string('n', strict=True))
self.assertFalse(strutils.bool_from_string('no', strict=True))
self.assertFalse(strutils.bool_from_string('0', strict=True))
self.assertTrue(strutils.bool_from_string('1', strict=True))
# Avoid font-similarity issues (one looks like lowercase-el, zero like
# oh, etc...)
for char in ('O', 'o', 'L', 'l', 'I', 'i'):
self.assertRaises(ValueError, strutils.bool_from_string, char,
strict=True)
def test_int_from_bool_as_string(self):
self.assertEqual(1, strutils.int_from_bool_as_string(True))
self.assertEqual(0, strutils.int_from_bool_as_string(False))
def test_safe_decode(self):
safe_decode = strutils.safe_decode
self.assertRaises(TypeError, safe_decode, True)
self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b("ni\xc3\xb1o"),
incoming="utf-8"))
if six.PY2:
# In Python 3, bytes.decode() doesn't support anymore
# bytes => bytes encodings like base64
self.assertEqual(six.u("test"), safe_decode("dGVzdA==",
incoming='base64'))
self.assertEqual(six.u("strange"), safe_decode(six.b('\x80strange'),
incoming='utf-8', errors='ignore'))
self.assertEqual(six.u('\xc0'), safe_decode(six.b('\xc0'),
incoming='iso-8859-1'))
# Forcing incoming to ascii so it falls back to utf-8
self.assertEqual(six.u('ni\xf1o'), safe_decode(six.b('ni\xc3\xb1o'),
incoming='ascii'))
self.assertEqual(six.u('foo'), safe_decode(b'foo'))
def test_safe_encode(self):
safe_encode = strutils.safe_encode
self.assertRaises(TypeError, safe_encode, True)
self.assertEqual(six.b("ni\xc3\xb1o"), safe_encode(six.u('ni\xf1o'),
encoding="utf-8"))
if six.PY2:
# In Python 3, str.encode() doesn't support anymore
# text => text encodings like base64
self.assertEqual(six.b("dGVzdA==\n"),
safe_encode("test", encoding='base64'))
self.assertEqual(six.b('ni\xf1o'), safe_encode(six.b("ni\xc3\xb1o"),
encoding="iso-8859-1",
incoming="utf-8"))
# Forcing incoming to ascii so it falls back to utf-8
self.assertEqual(six.b('ni\xc3\xb1o'),
safe_encode(six.b('ni\xc3\xb1o'), incoming='ascii'))
self.assertEqual(six.b('foo'), safe_encode(six.u('foo')))
def test_slugify(self):
to_slug = strutils.to_slug
self.assertRaises(TypeError, to_slug, True)
self.assertEqual(six.u("hello"), to_slug("hello"))
self.assertEqual(six.u("two-words"), to_slug("Two Words"))
self.assertEqual(six.u("ma-any-spa-ce-es"),
to_slug("Ma-any\t spa--ce- es"))
self.assertEqual(six.u("excamation"), to_slug("exc!amation!"))
self.assertEqual(six.u("ampserand"), to_slug("&ampser$and"))
self.assertEqual(six.u("ju5tnum8er"), to_slug("ju5tnum8er"))
self.assertEqual(six.u("strip-"), to_slug(" strip - "))
self.assertEqual(six.u("perche"),
to_slug(six.b("perch\xc3\xa9"), incoming='utf-8'))
self.assertEqual(six.u("strange"),
to_slug("\x80strange", incoming='utf-8',
errors="ignore"))
class StringToBytesTest(test_base.BaseTestCase):
_unit_system = [
('si', dict(unit_system='SI')),
('iec', dict(unit_system='IEC')),
('invalid_unit_system', dict(unit_system='KKK', assert_error=True)),
]
_sign = [
('no_sign', dict(sign='')),
('positive', dict(sign='+')),
('negative', dict(sign='-')),
('invalid_sign', dict(sign='~', assert_error=True)),
]
_magnitude = [
('integer', dict(magnitude='79')),
('decimal', dict(magnitude='7.9')),
('decimal_point_start', dict(magnitude='.9')),
('decimal_point_end', dict(magnitude='79.', assert_error=True)),
('invalid_literal', dict(magnitude='7.9.9', assert_error=True)),
('garbage_value', dict(magnitude='asdf', assert_error=True)),
]
_unit_prefix = [
('no_unit_prefix', dict(unit_prefix='')),
('k', dict(unit_prefix='k')),
('K', dict(unit_prefix='K')),
('M', dict(unit_prefix='M')),
('G', dict(unit_prefix='G')),
('T', dict(unit_prefix='T')),
('Ki', dict(unit_prefix='Ki')),
('Mi', dict(unit_prefix='Mi')),
('Gi', dict(unit_prefix='Gi')),
('Ti', dict(unit_prefix='Ti')),
('invalid_unit_prefix', dict(unit_prefix='B', assert_error=True)),
]
_unit_suffix = [
('b', dict(unit_suffix='b')),
('bit', dict(unit_suffix='bit')),
('B', dict(unit_suffix='B')),
('invalid_unit_suffix', dict(unit_suffix='Kg', assert_error=True)),
]
_return_int = [
('return_dec', dict(return_int=False)),
('return_int', dict(return_int=True)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._unit_system,
cls._sign,
cls._magnitude,
cls._unit_prefix,
cls._unit_suffix,
cls._return_int)
def test_string_to_bytes(self):
def _get_quantity(sign, magnitude, unit_suffix):
res = float('%s%s' % (sign, magnitude))
if unit_suffix in ['b', 'bit']:
res /= 8
return res
def _get_constant(unit_prefix, unit_system):
if not unit_prefix:
return 1
elif unit_system == 'SI':
res = getattr(units, unit_prefix)
elif unit_system == 'IEC':
if unit_prefix.endswith('i'):
res = getattr(units, unit_prefix)
else:
res = getattr(units, '%si' % unit_prefix)
return res
text = ''.join([self.sign, self.magnitude, self.unit_prefix,
self.unit_suffix])
err_si = self.unit_system == 'SI' and (self.unit_prefix == 'K' or
self.unit_prefix.endswith('i'))
err_iec = self.unit_system == 'IEC' and self.unit_prefix == 'k'
if getattr(self, 'assert_error', False) or err_si or err_iec:
self.assertRaises(ValueError, strutils.string_to_bytes,
text, unit_system=self.unit_system,
return_int=self.return_int)
return
quantity = _get_quantity(self.sign, self.magnitude, self.unit_suffix)
constant = _get_constant(self.unit_prefix, self.unit_system)
expected = quantity * constant
actual = strutils.string_to_bytes(text, unit_system=self.unit_system,
return_int=self.return_int)
if self.return_int:
self.assertEqual(actual, int(math.ceil(expected)))
else:
self.assertAlmostEqual(actual, expected)
StringToBytesTest.generate_scenarios()
class MaskPasswordTestCase(test_base.BaseTestCase):
def test_json(self):
# Test 'adminPass' w/o spaces
payload = """{'adminPass':'mypassword'}"""
expected = """{'adminPass':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'adminPass' with spaces
payload = """{ 'adminPass' : 'mypassword' }"""
expected = """{ 'adminPass' : '***' }"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' w/o spaces
payload = """{'admin_pass':'mypassword'}"""
expected = """{'admin_pass':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' with spaces
payload = """{ 'admin_pass' : 'mypassword' }"""
expected = """{ 'admin_pass' : '***' }"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' w/o spaces
payload = """{'admin_password':'mypassword'}"""
expected = """{'admin_password':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' with spaces
payload = """{ 'admin_password' : 'mypassword' }"""
expected = """{ 'admin_password' : '***' }"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' w/o spaces
payload = """{'password':'mypassword'}"""
expected = """{'password':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' with spaces
payload = """{ 'password' : 'mypassword' }"""
expected = """{ 'password' : '***' }"""
self.assertEqual(expected, strutils.mask_password(payload))
def test_xml(self):
# Test 'adminPass' w/o spaces
payload = """<adminPass>mypassword</adminPass>"""
expected = """<adminPass>***</adminPass>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'adminPass' with spaces
payload = """<adminPass>
mypassword
</adminPass>"""
expected = """<adminPass>***</adminPass>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' w/o spaces
payload = """<admin_pass>mypassword</admin_pass>"""
expected = """<admin_pass>***</admin_pass>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' with spaces
payload = """<admin_pass>
mypassword
</admin_pass>"""
expected = """<admin_pass>***</admin_pass>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' w/o spaces
payload = """<admin_password>mypassword</admin_password>"""
expected = """<admin_password>***</admin_password>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' with spaces
payload = """<admin_password>
mypassword
</admin_password>"""
expected = """<admin_password>***</admin_password>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' w/o spaces
payload = """<password>mypassword</password>"""
expected = """<password>***</password>"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' with spaces
payload = """<password>
mypassword
</password>"""
expected = """<password>***</password>"""
self.assertEqual(expected, strutils.mask_password(payload))
def test_xml_attribute(self):
# Test 'adminPass' w/o spaces
payload = """adminPass='mypassword'"""
expected = """adminPass='***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'adminPass' with spaces
payload = """adminPass = 'mypassword'"""
expected = """adminPass = '***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'adminPass' with double quotes
payload = """adminPass = "mypassword\""""
expected = """adminPass = "***\""""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' w/o spaces
payload = """admin_pass='mypassword'"""
expected = """admin_pass='***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' with spaces
payload = """admin_pass = 'mypassword'"""
expected = """admin_pass = '***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_pass' with double quotes
payload = """admin_pass = "mypassword\""""
expected = """admin_pass = "***\""""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' w/o spaces
payload = """admin_password='mypassword'"""
expected = """admin_password='***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' with spaces
payload = """admin_password = 'mypassword'"""
expected = """admin_password = '***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'admin_password' with double quotes
payload = """admin_password = "mypassword\""""
expected = """admin_password = "***\""""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' w/o spaces
payload = """password='mypassword'"""
expected = """password='***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' with spaces
payload = """password = 'mypassword'"""
expected = """password = '***'"""
self.assertEqual(expected, strutils.mask_password(payload))
# Test 'password' with double quotes
payload = """password = "mypassword\""""
expected = """password = "***\""""
self.assertEqual(expected, strutils.mask_password(payload))
def test_json_message(self):
payload = """body: {"changePassword": {"adminPass": "1234567"}}"""
expected = """body: {"changePassword": {"adminPass": "***"}}"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """body: {"rescue": {"admin_pass": "1234567"}}"""
expected = """body: {"rescue": {"admin_pass": "***"}}"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """body: {"rescue": {"admin_password": "1234567"}}"""
expected = """body: {"rescue": {"admin_password": "***"}}"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """body: {"rescue": {"password": "1234567"}}"""
expected = """body: {"rescue": {"password": "***"}}"""
self.assertEqual(expected, strutils.mask_password(payload))
def test_xml_message(self):
payload = """<?xml version="1.0" encoding="UTF-8"?>
<rebuild
xmlns="http://docs.openstack.org/compute/api/v1.1"
name="foobar"
imageRef="http://openstack.example.com/v1.1/32278/images/70a599e0-31e7"
accessIPv4="1.2.3.4"
accessIPv6="fe80::100"
adminPass="seekr3t">
<metadata>
<meta key="My Server Name">Apache1</meta>
</metadata>
</rebuild>"""
expected = """<?xml version="1.0" encoding="UTF-8"?>
<rebuild
xmlns="http://docs.openstack.org/compute/api/v1.1"
name="foobar"
imageRef="http://openstack.example.com/v1.1/32278/images/70a599e0-31e7"
accessIPv4="1.2.3.4"
accessIPv6="fe80::100"
adminPass="***">
<metadata>
<meta key="My Server Name">Apache1</meta>
</metadata>
</rebuild>"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
admin_pass="MySecretPass"/>"""
expected = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
admin_pass="***"/>"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
admin_password="MySecretPass"/>"""
expected = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
admin_password="***"/>"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
password="MySecretPass"/>"""
expected = """<?xml version="1.0" encoding="UTF-8"?>
<rescue xmlns="http://docs.openstack.org/compute/api/v1.1"
password="***"/>"""
self.assertEqual(expected, strutils.mask_password(payload))
def test_mask_password(self):
payload = "test = 'password' : 'aaaaaa'"
expected = "test = 'password' : '111'"
self.assertEqual(expected,
strutils.mask_password(payload, secret='111'))
payload = 'mysqld --password "aaaaaa"'
expected = 'mysqld --password "****"'
self.assertEqual(expected,
strutils.mask_password(payload, secret='****'))
payload = 'mysqld --password aaaaaa'
expected = 'mysqld --password ???'
self.assertEqual(expected,
strutils.mask_password(payload, secret='???'))
payload = 'mysqld --password = "aaaaaa"'
expected = 'mysqld --password = "****"'
self.assertEqual(expected,
strutils.mask_password(payload, secret='****'))
payload = "mysqld --password = 'aaaaaa'"
expected = "mysqld --password = '****'"
self.assertEqual(expected,
strutils.mask_password(payload, secret='****'))
payload = "mysqld --password = aaaaaa"
expected = "mysqld --password = ****"
self.assertEqual(expected,
strutils.mask_password(payload, secret='****'))
payload = "test = password = aaaaaa"
expected = "test = password = 111"
self.assertEqual(expected,
strutils.mask_password(payload, secret='111'))
payload = "test = password= aaaaaa"
expected = "test = password= 111"
self.assertEqual(expected,
strutils.mask_password(payload, secret='111'))
payload = "test = password =aaaaaa"
expected = "test = password =111"
self.assertEqual(expected,
strutils.mask_password(payload, secret='111'))
payload = "test = password=aaaaaa"
expected = "test = password=111"
self.assertEqual(expected,
strutils.mask_password(payload, secret='111'))
payload = 'test = "original_password" : "aaaaaaaaa"'
expected = 'test = "original_password" : "***"'
self.assertEqual(expected, strutils.mask_password(payload))
payload = 'test = "param1" : "value"'
expected = 'test = "param1" : "value"'
self.assertEqual(expected, strutils.mask_password(payload))
payload = """{'adminPass':'mypassword'}"""
payload = six.text_type(payload)
expected = """{'adminPass':'***'}"""
self.assertEqual(expected, strutils.mask_password(payload))
payload = ("test = 'node.session.auth.password','-v','mypassword',"
"'nomask'")
expected = ("test = 'node.session.auth.password','-v','***',"
"'nomask'")
self.assertEqual(expected, strutils.mask_password(payload))
payload = ("test = 'node.session.auth.password', '--password', "
"'mypassword', 'nomask'")
expected = ("test = 'node.session.auth.password', '--password', "
"'***', 'nomask'")
self.assertEqual(expected, strutils.mask_password(payload))
payload = ("test = 'node.session.auth.password', '--password', "
"'mypassword'")
expected = ("test = 'node.session.auth.password', '--password', "
"'***'")
self.assertEqual(expected, strutils.mask_password(payload))
payload = "test = node.session.auth.password -v mypassword nomask"
expected = "test = node.session.auth.password -v *** nomask"
self.assertEqual(expected, strutils.mask_password(payload))
payload = ("test = node.session.auth.password --password mypassword "
"nomask")
expected = ("test = node.session.auth.password --password *** "
"nomask")
self.assertEqual(expected, strutils.mask_password(payload))
payload = ("test = node.session.auth.password --password mypassword")
expected = ("test = node.session.auth.password --password ***")
self.assertEqual(expected, strutils.mask_password(payload))
payload = "test = cmd --password my\xe9\x80\x80pass"
expected = ("test = cmd --password ***")
self.assertEqual(expected, strutils.mask_password(payload))

View File

@ -1,340 +0,0 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import calendar
import datetime
import time
import iso8601
import mock
from oslotest import base as test_base
from testtools import matchers
from openstack.common import timeutils
class TimeUtilsTest(test_base.BaseTestCase):
def setUp(self):
super(TimeUtilsTest, self).setUp()
self.skynet_self_aware_time_str = '1997-08-29T06:14:00Z'
self.skynet_self_aware_time_ms_str = '1997-08-29T06:14:00.000123Z'
self.skynet_self_aware_time = datetime.datetime(1997, 8, 29, 6, 14, 0)
self.skynet_self_aware_ms_time = datetime.datetime(
1997, 8, 29, 6, 14, 0, 123)
self.one_minute_before = datetime.datetime(1997, 8, 29, 6, 13, 0)
self.one_minute_after = datetime.datetime(1997, 8, 29, 6, 15, 0)
self.skynet_self_aware_time_perfect_str = '1997-08-29T06:14:00.000000'
self.skynet_self_aware_time_perfect = datetime.datetime(1997, 8, 29,
6, 14, 0)
self.addCleanup(timeutils.clear_time_override)
def test_isotime(self):
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_time
dt = timeutils.isotime()
self.assertEqual(dt, self.skynet_self_aware_time_str)
def test_isotimei_micro_second_precision(self):
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_ms_time
dt = timeutils.isotime(subsecond=True)
self.assertEqual(dt, self.skynet_self_aware_time_ms_str)
def test_parse_isotime(self):
expect = timeutils.parse_isotime(self.skynet_self_aware_time_str)
skynet_self_aware_time_utc = self.skynet_self_aware_time.replace(
tzinfo=iso8601.iso8601.UTC)
self.assertEqual(skynet_self_aware_time_utc, expect)
def test_parse_isotime_micro_second_precision(self):
expect = timeutils.parse_isotime(self.skynet_self_aware_time_ms_str)
skynet_self_aware_time_ms_utc = self.skynet_self_aware_ms_time.replace(
tzinfo=iso8601.iso8601.UTC)
self.assertEqual(skynet_self_aware_time_ms_utc, expect)
def test_strtime(self):
expect = timeutils.strtime(self.skynet_self_aware_time_perfect)
self.assertEqual(self.skynet_self_aware_time_perfect_str, expect)
def test_parse_strtime(self):
perfect_time_format = self.skynet_self_aware_time_perfect_str
expect = timeutils.parse_strtime(perfect_time_format)
self.assertEqual(self.skynet_self_aware_time_perfect, expect)
def test_strtime_and_back(self):
orig_t = datetime.datetime(1997, 8, 29, 6, 14, 0)
s = timeutils.strtime(orig_t)
t = timeutils.parse_strtime(s)
self.assertEqual(orig_t, t)
def _test_is_older_than(self, fn):
strptime = datetime.datetime.strptime
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_time
datetime_mock.strptime = strptime
expect_true = timeutils.is_older_than(fn(self.one_minute_before),
59)
self.assertTrue(expect_true)
expect_false = timeutils.is_older_than(fn(self.one_minute_before),
60)
self.assertFalse(expect_false)
expect_false = timeutils.is_older_than(fn(self.one_minute_before),
61)
self.assertFalse(expect_false)
def test_is_older_than_datetime(self):
self._test_is_older_than(lambda x: x)
def test_is_older_than_str(self):
self._test_is_older_than(timeutils.strtime)
def test_is_older_than_aware(self):
"""Tests sending is_older_than an 'aware' datetime."""
self._test_is_older_than(lambda x: x.replace(
tzinfo=iso8601.iso8601.UTC))
def _test_is_newer_than(self, fn):
strptime = datetime.datetime.strptime
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_time
datetime_mock.strptime = strptime
expect_true = timeutils.is_newer_than(fn(self.one_minute_after),
59)
self.assertTrue(expect_true)
expect_false = timeutils.is_newer_than(fn(self.one_minute_after),
60)
self.assertFalse(expect_false)
expect_false = timeutils.is_newer_than(fn(self.one_minute_after),
61)
self.assertFalse(expect_false)
def test_is_newer_than_datetime(self):
self._test_is_newer_than(lambda x: x)
def test_is_newer_than_str(self):
self._test_is_newer_than(timeutils.strtime)
def test_is_newer_than_aware(self):
"""Tests sending is_newer_than an 'aware' datetime."""
self._test_is_newer_than(lambda x: x.replace(
tzinfo=iso8601.iso8601.UTC))
def test_set_time_override_using_default(self):
now = timeutils.utcnow_ts()
# NOTE(kgriffs): Normally it's bad form to sleep in a unit test,
# but this is the only way to test that set_time_override defaults
# to setting the override to the current time.
time.sleep(1)
timeutils.set_time_override()
overriden_now = timeutils.utcnow_ts()
self.assertThat(now, matchers.LessThan(overriden_now))
def test_utcnow_ts(self):
skynet_self_aware_ts = 872835240
skynet_dt = datetime.datetime.utcfromtimestamp(skynet_self_aware_ts)
self.assertEqual(self.skynet_self_aware_time, skynet_dt)
# NOTE(kgriffs): timeutils.utcnow_ts() uses time.time()
# IFF time override is not set.
with mock.patch('time.time') as time_mock:
time_mock.return_value = skynet_self_aware_ts
ts = timeutils.utcnow_ts()
self.assertEqual(ts, skynet_self_aware_ts)
timeutils.set_time_override(skynet_dt)
ts = timeutils.utcnow_ts()
self.assertEqual(ts, skynet_self_aware_ts)
def test_utcnow(self):
timeutils.set_time_override(mock.sentinel.utcnow)
self.assertEqual(timeutils.utcnow(), mock.sentinel.utcnow)
timeutils.clear_time_override()
self.assertFalse(timeutils.utcnow() == mock.sentinel.utcnow)
self.assertTrue(timeutils.utcnow())
def test_advance_time_delta(self):
timeutils.set_time_override(self.one_minute_before)
timeutils.advance_time_delta(datetime.timedelta(seconds=60))
self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time)
def test_advance_time_seconds(self):
timeutils.set_time_override(self.one_minute_before)
timeutils.advance_time_seconds(60)
self.assertEqual(timeutils.utcnow(), self.skynet_self_aware_time)
def test_marshall_time(self):
now = timeutils.utcnow()
binary = timeutils.marshall_now(now)
backagain = timeutils.unmarshall_time(binary)
self.assertEqual(now, backagain)
def test_delta_seconds(self):
before = timeutils.utcnow()
after = before + datetime.timedelta(days=7, seconds=59,
microseconds=123456)
self.assertAlmostEquals(604859.123456,
timeutils.delta_seconds(before, after))
def test_total_seconds(self):
delta = datetime.timedelta(days=1, hours=2, minutes=3, seconds=4.5)
self.assertAlmostEquals(93784.5,
timeutils.total_seconds(delta))
def test_iso8601_from_timestamp(self):
utcnow = timeutils.utcnow()
iso = timeutils.isotime(utcnow)
ts = calendar.timegm(utcnow.timetuple())
self.assertEqual(iso, timeutils.iso8601_from_timestamp(ts))
def test_is_soon(self):
expires = timeutils.utcnow() + datetime.timedelta(minutes=5)
self.assertFalse(timeutils.is_soon(expires, 120))
self.assertTrue(timeutils.is_soon(expires, 300))
self.assertTrue(timeutils.is_soon(expires, 600))
with mock.patch('datetime.datetime') as datetime_mock:
datetime_mock.utcnow.return_value = self.skynet_self_aware_time
expires = timeutils.utcnow()
self.assertTrue(timeutils.is_soon(expires, 0))
class TestIso8601Time(test_base.BaseTestCase):
def _instaneous(self, timestamp, yr, mon, day, hr, minute, sec, micro):
self.assertEqual(timestamp.year, yr)
self.assertEqual(timestamp.month, mon)
self.assertEqual(timestamp.day, day)
self.assertEqual(timestamp.hour, hr)
self.assertEqual(timestamp.minute, minute)
self.assertEqual(timestamp.second, sec)
self.assertEqual(timestamp.microsecond, micro)
def _do_test(self, time_str, yr, mon, day, hr, minute, sec, micro, shift):
DAY_SECONDS = 24 * 60 * 60
timestamp = timeutils.parse_isotime(time_str)
self._instaneous(timestamp, yr, mon, day, hr, minute, sec, micro)
offset = timestamp.tzinfo.utcoffset(None)
self.assertEqual(offset.seconds + offset.days * DAY_SECONDS, shift)
def test_zulu(self):
time_str = '2012-02-14T20:53:07Z'
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, 0)
def test_zulu_micros(self):
time_str = '2012-02-14T20:53:07.123Z'
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 123000, 0)
def test_offset_east(self):
time_str = '2012-02-14T20:53:07+04:30'
offset = 4.5 * 60 * 60
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, offset)
def test_offset_east_micros(self):
time_str = '2012-02-14T20:53:07.42+04:30'
offset = 4.5 * 60 * 60
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 420000, offset)
def test_offset_west(self):
time_str = '2012-02-14T20:53:07-05:30'
offset = -5.5 * 60 * 60
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 0, offset)
def test_offset_west_micros(self):
time_str = '2012-02-14T20:53:07.654321-05:30'
offset = -5.5 * 60 * 60
self._do_test(time_str, 2012, 2, 14, 20, 53, 7, 654321, offset)
def test_compare(self):
zulu = timeutils.parse_isotime('2012-02-14T20:53:07')
east = timeutils.parse_isotime('2012-02-14T20:53:07-01:00')
west = timeutils.parse_isotime('2012-02-14T20:53:07+01:00')
self.assertTrue(east > west)
self.assertTrue(east > zulu)
self.assertTrue(zulu > west)
def test_compare_micros(self):
zulu = timeutils.parse_isotime('2012-02-14T20:53:07.6544')
east = timeutils.parse_isotime('2012-02-14T19:53:07.654321-01:00')
west = timeutils.parse_isotime('2012-02-14T21:53:07.655+01:00')
self.assertTrue(east < west)
self.assertTrue(east < zulu)
self.assertTrue(zulu < west)
def test_zulu_roundtrip(self):
time_str = '2012-02-14T20:53:07Z'
zulu = timeutils.parse_isotime(time_str)
self.assertEqual(zulu.tzinfo, iso8601.iso8601.UTC)
self.assertEqual(timeutils.isotime(zulu), time_str)
def test_east_roundtrip(self):
time_str = '2012-02-14T20:53:07-07:00'
east = timeutils.parse_isotime(time_str)
self.assertEqual(east.tzinfo.tzname(None), '-07:00')
self.assertEqual(timeutils.isotime(east), time_str)
def test_west_roundtrip(self):
time_str = '2012-02-14T20:53:07+11:30'
west = timeutils.parse_isotime(time_str)
self.assertEqual(west.tzinfo.tzname(None), '+11:30')
self.assertEqual(timeutils.isotime(west), time_str)
def test_now_roundtrip(self):
time_str = timeutils.isotime()
now = timeutils.parse_isotime(time_str)
self.assertEqual(now.tzinfo, iso8601.iso8601.UTC)
self.assertEqual(timeutils.isotime(now), time_str)
def test_zulu_normalize(self):
time_str = '2012-02-14T20:53:07Z'
zulu = timeutils.parse_isotime(time_str)
normed = timeutils.normalize_time(zulu)
self._instaneous(normed, 2012, 2, 14, 20, 53, 7, 0)
def test_east_normalize(self):
time_str = '2012-02-14T20:53:07-07:00'
east = timeutils.parse_isotime(time_str)
normed = timeutils.normalize_time(east)
self._instaneous(normed, 2012, 2, 15, 3, 53, 7, 0)
def test_west_normalize(self):
time_str = '2012-02-14T20:53:07+21:00'
west = timeutils.parse_isotime(time_str)
normed = timeutils.normalize_time(west)
self._instaneous(normed, 2012, 2, 13, 23, 53, 7, 0)
def test_normalize_aware_to_naive(self):
dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
time_str = '2011-02-14T20:53:07+21:00'
aware = timeutils.parse_isotime(time_str)
naive = timeutils.normalize_time(aware)
self.assertTrue(naive < dt)
def test_normalize_zulu_aware_to_naive(self):
dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
time_str = '2011-02-14T19:53:07Z'
aware = timeutils.parse_isotime(time_str)
naive = timeutils.normalize_time(aware)
self.assertTrue(naive < dt)
def test_normalize_naive(self):
dt = datetime.datetime(2011, 2, 14, 20, 53, 7)
dtn = datetime.datetime(2011, 2, 14, 19, 53, 7)
naive = timeutils.normalize_time(dtn)
self.assertTrue(naive < dt)

View File

@ -1,40 +0,0 @@
# Copyright 2013 IBM Corp
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base as test_base
from openstack.common import units
class UnitTest(test_base.BaseTestCase):
def test_binary_unit(self):
self.assertEqual(units.Ki, 1024)
self.assertEqual(units.Mi, 1024 ** 2)
self.assertEqual(units.Gi, 1024 ** 3)
self.assertEqual(units.Ti, 1024 ** 4)
self.assertEqual(units.Pi, 1024 ** 5)
self.assertEqual(units.Ei, 1024 ** 6)
self.assertEqual(units.Zi, 1024 ** 7)
self.assertEqual(units.Yi, 1024 ** 8)
def test_decimal_unit(self):
self.assertEqual(units.k, 1000)
self.assertEqual(units.M, 1000 ** 2)
self.assertEqual(units.G, 1000 ** 3)
self.assertEqual(units.T, 1000 ** 4)
self.assertEqual(units.P, 1000 ** 5)
self.assertEqual(units.E, 1000 ** 6)
self.assertEqual(units.Z, 1000 ** 7)
self.assertEqual(units.Y, 1000 ** 8)

12
tox.ini
View File

@ -73,27 +73,21 @@ commands =
tests.unit.test_cliutils \
tests.unit.test_context \
tests.unit.test_deprecated \
tests.unit.test_excutils \
tests.unit.test_fileutils \
tests.unit.test_funcutils \
tests.unit.test_hooks \
tests.unit.test_imageutils \
tests.unit.test_importutils \
tests.unit.test_jsonutils \
tests.unit.test_local \
tests.unit.test_lockutils \
tests.unit.test_log \
tests.unit.test_memorycache \
tests.unit.test_network_utils \
tests.unit.test_periodic \
tests.unit.test_policy \
tests.unit.test_quota \
tests.unit.test_request_utils \
tests.unit.test_sslutils \
tests.unit.test_strutils \
tests.unit.test_systemd \
tests.unit.test_timeutils \
tests.unit.test_units \
tests.unit.test_uuidutils \
tests.unit.test_versionutils \
tests.unit.test_xmlutils
@ -130,28 +124,22 @@ commands =
tests.unit.test_cliutils \
tests.unit.test_context \
tests.unit.test_deprecated \
tests.unit.test_excutils \
tests.unit.test_fileutils \
tests.unit.test_funcutils \
tests.unit.test_hooks \
tests.unit.test_imageutils \
tests.unit.test_importutils \
tests.unit.test_jsonutils \
tests.unit.test_local \
tests.unit.test_lockutils \
tests.unit.test_log \
tests.unit.test_memorycache \
tests.unit.test_network_utils \
tests.unit.test_notifier \
tests.unit.test_periodic \
tests.unit.test_policy \
tests.unit.test_quota \
tests.unit.test_request_utils \
tests.unit.test_sslutils \
tests.unit.test_strutils \
tests.unit.test_systemd \
tests.unit.test_timeutils \
tests.unit.test_units \
tests.unit.test_uuidutils \
tests.unit.test_versionutils \
tests.unit.test_xmlutils