diff --git a/keystone/common/utils.py b/keystone/common/utils.py index 38cee7e22b..4073edd57e 100644 --- a/keystone/common/utils.py +++ b/keystone/common/utils.py @@ -16,7 +16,6 @@ # License for the specific language governing permissions and limitations # under the License. -import calendar import collections import grp import hashlib @@ -137,48 +136,6 @@ def remove_duplicate_dicts_by_id(item_list): return list(unique.values()) -def get_blob_from_credential(credential): - try: - blob = jsonutils.loads(credential.blob) - except (ValueError, TypeError): - raise exception.ValidationError( - message=_('Invalid blob in credential')) - if not blob or not isinstance(blob, dict): - raise exception.ValidationError(attribute='blob', - target='credential') - return blob - - -def convert_ec2_to_v3_credential(ec2credential): - blob = {'access': ec2credential.access, - 'secret': ec2credential.secret} - return {'id': hash_access_key(ec2credential.access), - 'user_id': ec2credential.user_id, - 'project_id': ec2credential.tenant_id, - 'blob': jsonutils.dumps(blob), - 'type': 'ec2', - 'extra': jsonutils.dumps({})} - - -def convert_v3_to_ec2_credential(credential): - blob = get_blob_from_credential(credential) - return {'access': blob.get('access'), - 'secret': blob.get('secret'), - 'user_id': credential.user_id, - 'tenant_id': credential.project_id, - } - - -def unixtime(dt_obj): - """Format datetime object as unix timestamp. - - :param dt_obj: datetime.datetime object - :returns: float - - """ - return calendar.timegm(dt_obj.utctimetuple()) - - def auth_str_equal(provided, known): """Constant-time string comparison. @@ -341,109 +298,6 @@ def get_unix_group(group=None): return group_info.gr_gid, group_info.gr_name -def set_permissions(path, mode=None, user=None, group=None, log=None): - """Set the ownership and permissions on the pathname. - - Each of the mode, user and group are optional, if None then - that aspect is not modified. - - Owner and group may be specified either with a symbolic name - or numeric id. - - :param string path: Pathname of directory whose existence is assured. - :param object mode: ownership permissions flags (int) i.e. chmod, - if None do not set. - :param object user: set user, name (string) or uid (integer), - if None do not set. - :param object group: set group, name (string) or gid (integer) - if None do not set. - :param logger log: logging.logger object, used to emit log messages, - if None no logging is performed. - - """ - if user is None: - user_uid, user_name = None, None - else: - user_uid, user_name = get_unix_user(user) - - if group is None: - group_gid, group_name = None, None - else: - group_gid, group_name = get_unix_group(group) - - if log: - if mode is None: - mode_string = str(mode) - else: - mode_string = oct(mode) - log.debug("set_permissions: " - "path='%s' mode=%s user=%s(%s) group=%s(%s)", - path, mode_string, - user_name, user_uid, group_name, group_gid) - - # Change user and group if specified - if user_uid is not None or group_gid is not None: - if user_uid is None: - user_uid = -1 - if group_gid is None: - group_gid = -1 - try: - os.chown(path, user_uid, group_gid) - except OSError as exc: - raise EnvironmentError("chown('%s', %s, %s): %s" % - (path, - user_name, group_name, - exc.strerror)) - - # Change permission flags - if mode is not None: - try: - os.chmod(path, mode) - except OSError as exc: - raise EnvironmentError("chmod('%s', %#o): %s" % - (path, mode, exc.strerror)) - - -def make_dirs(path, mode=None, user=None, group=None, log=None): - """Assure directory exists, set ownership and permissions. - - Assure the directory exists and optionally set its ownership - and permissions. - - Each of the mode, user and group are optional, if None then - that aspect is not modified. - - Owner and group may be specified either with a symbolic name - or numeric id. - - :param string path: Pathname of directory whose existence is assured. - :param object mode: ownership permissions flags (int) i.e. chmod, - if None do not set. - :param object user: set user, name (string) or uid (integer), - if None do not set. - :param object group: set group, name (string) or gid (integer) - if None do not set. - :param logger log: logging.logger object, used to emit log messages, - if None no logging is performed. - - """ - if log: - if mode is None: - mode_string = str(mode) - else: - mode_string = oct(mode) - log.debug("make_dirs path='%s' mode=%s user=%s group=%s", - path, mode_string, user, group) - - if not os.path.exists(path): - try: - os.makedirs(path) - except OSError as exc: - raise EnvironmentError("makedirs('%s'): %s" % (path, exc.strerror)) - - set_permissions(path, mode, user, group, log) - - class WhiteListedItemFilter(object): def __init__(self, whitelist, data): diff --git a/keystone/tests/unit/common/test_utils.py b/keystone/tests/unit/common/test_utils.py index 7865b8d8ef..0b48fd3238 100644 --- a/keystone/tests/unit/common/test_utils.py +++ b/keystone/tests/unit/common/test_utils.py @@ -209,20 +209,6 @@ class UtilsTestCase(unit.BaseTestCase): self.assertFalse(common_utils.auth_str_equal('aaaaa', 'a')) self.assertFalse(common_utils.auth_str_equal('ABC123', 'abc123')) - def test_unixtime(self): - global TZ - - @utils.timezone - def _test_unixtime(): - epoch = common_utils.unixtime(dt) - self.assertEqual(epoch, epoch_ans, "TZ=%s" % TZ) - - dt = datetime.datetime(1970, 1, 2, 3, 4, 56, 0) - epoch_ans = 56 + 4 * 60 + 3 * 3600 + 86400 - for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']: - TZ = 'UTC' + d - _test_unixtime() - def test_url_safe_check(self): base_str = 'i am safe' self.assertFalse(common_utils.is_not_url_safe(base_str))