Remove some unused functions
Some functions in common/utils is useless for more than 3 releases. This patch removed them. Change-Id: I328b2fcf99590417e9813d7d95a2df9c0615d488 bp: removed-as-of-rocky
This commit is contained in:
parent
f0f1bdefec
commit
f45f922a52
|
@ -16,7 +16,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import calendar
|
||||
import collections
|
||||
import grp
|
||||
import hashlib
|
||||
|
@ -137,48 +136,6 @@ def remove_duplicate_dicts_by_id(item_list):
|
|||
return list(unique.values())
|
||||
|
||||
|
||||
def get_blob_from_credential(credential):
|
||||
try:
|
||||
blob = jsonutils.loads(credential.blob)
|
||||
except (ValueError, TypeError):
|
||||
raise exception.ValidationError(
|
||||
message=_('Invalid blob in credential'))
|
||||
if not blob or not isinstance(blob, dict):
|
||||
raise exception.ValidationError(attribute='blob',
|
||||
target='credential')
|
||||
return blob
|
||||
|
||||
|
||||
def convert_ec2_to_v3_credential(ec2credential):
|
||||
blob = {'access': ec2credential.access,
|
||||
'secret': ec2credential.secret}
|
||||
return {'id': hash_access_key(ec2credential.access),
|
||||
'user_id': ec2credential.user_id,
|
||||
'project_id': ec2credential.tenant_id,
|
||||
'blob': jsonutils.dumps(blob),
|
||||
'type': 'ec2',
|
||||
'extra': jsonutils.dumps({})}
|
||||
|
||||
|
||||
def convert_v3_to_ec2_credential(credential):
|
||||
blob = get_blob_from_credential(credential)
|
||||
return {'access': blob.get('access'),
|
||||
'secret': blob.get('secret'),
|
||||
'user_id': credential.user_id,
|
||||
'tenant_id': credential.project_id,
|
||||
}
|
||||
|
||||
|
||||
def unixtime(dt_obj):
|
||||
"""Format datetime object as unix timestamp.
|
||||
|
||||
:param dt_obj: datetime.datetime object
|
||||
:returns: float
|
||||
|
||||
"""
|
||||
return calendar.timegm(dt_obj.utctimetuple())
|
||||
|
||||
|
||||
def auth_str_equal(provided, known):
|
||||
"""Constant-time string comparison.
|
||||
|
||||
|
@ -341,109 +298,6 @@ def get_unix_group(group=None):
|
|||
return group_info.gr_gid, group_info.gr_name
|
||||
|
||||
|
||||
def set_permissions(path, mode=None, user=None, group=None, log=None):
|
||||
"""Set the ownership and permissions on the pathname.
|
||||
|
||||
Each of the mode, user and group are optional, if None then
|
||||
that aspect is not modified.
|
||||
|
||||
Owner and group may be specified either with a symbolic name
|
||||
or numeric id.
|
||||
|
||||
:param string path: Pathname of directory whose existence is assured.
|
||||
:param object mode: ownership permissions flags (int) i.e. chmod,
|
||||
if None do not set.
|
||||
:param object user: set user, name (string) or uid (integer),
|
||||
if None do not set.
|
||||
:param object group: set group, name (string) or gid (integer)
|
||||
if None do not set.
|
||||
:param logger log: logging.logger object, used to emit log messages,
|
||||
if None no logging is performed.
|
||||
|
||||
"""
|
||||
if user is None:
|
||||
user_uid, user_name = None, None
|
||||
else:
|
||||
user_uid, user_name = get_unix_user(user)
|
||||
|
||||
if group is None:
|
||||
group_gid, group_name = None, None
|
||||
else:
|
||||
group_gid, group_name = get_unix_group(group)
|
||||
|
||||
if log:
|
||||
if mode is None:
|
||||
mode_string = str(mode)
|
||||
else:
|
||||
mode_string = oct(mode)
|
||||
log.debug("set_permissions: "
|
||||
"path='%s' mode=%s user=%s(%s) group=%s(%s)",
|
||||
path, mode_string,
|
||||
user_name, user_uid, group_name, group_gid)
|
||||
|
||||
# Change user and group if specified
|
||||
if user_uid is not None or group_gid is not None:
|
||||
if user_uid is None:
|
||||
user_uid = -1
|
||||
if group_gid is None:
|
||||
group_gid = -1
|
||||
try:
|
||||
os.chown(path, user_uid, group_gid)
|
||||
except OSError as exc:
|
||||
raise EnvironmentError("chown('%s', %s, %s): %s" %
|
||||
(path,
|
||||
user_name, group_name,
|
||||
exc.strerror))
|
||||
|
||||
# Change permission flags
|
||||
if mode is not None:
|
||||
try:
|
||||
os.chmod(path, mode)
|
||||
except OSError as exc:
|
||||
raise EnvironmentError("chmod('%s', %#o): %s" %
|
||||
(path, mode, exc.strerror))
|
||||
|
||||
|
||||
def make_dirs(path, mode=None, user=None, group=None, log=None):
|
||||
"""Assure directory exists, set ownership and permissions.
|
||||
|
||||
Assure the directory exists and optionally set its ownership
|
||||
and permissions.
|
||||
|
||||
Each of the mode, user and group are optional, if None then
|
||||
that aspect is not modified.
|
||||
|
||||
Owner and group may be specified either with a symbolic name
|
||||
or numeric id.
|
||||
|
||||
:param string path: Pathname of directory whose existence is assured.
|
||||
:param object mode: ownership permissions flags (int) i.e. chmod,
|
||||
if None do not set.
|
||||
:param object user: set user, name (string) or uid (integer),
|
||||
if None do not set.
|
||||
:param object group: set group, name (string) or gid (integer)
|
||||
if None do not set.
|
||||
:param logger log: logging.logger object, used to emit log messages,
|
||||
if None no logging is performed.
|
||||
|
||||
"""
|
||||
if log:
|
||||
if mode is None:
|
||||
mode_string = str(mode)
|
||||
else:
|
||||
mode_string = oct(mode)
|
||||
log.debug("make_dirs path='%s' mode=%s user=%s group=%s",
|
||||
path, mode_string, user, group)
|
||||
|
||||
if not os.path.exists(path):
|
||||
try:
|
||||
os.makedirs(path)
|
||||
except OSError as exc:
|
||||
raise EnvironmentError("makedirs('%s'): %s" % (path, exc.strerror))
|
||||
|
||||
set_permissions(path, mode, user, group, log)
|
||||
|
||||
|
||||
class WhiteListedItemFilter(object):
|
||||
|
||||
def __init__(self, whitelist, data):
|
||||
|
|
|
@ -209,20 +209,6 @@ class UtilsTestCase(unit.BaseTestCase):
|
|||
self.assertFalse(common_utils.auth_str_equal('aaaaa', 'a'))
|
||||
self.assertFalse(common_utils.auth_str_equal('ABC123', 'abc123'))
|
||||
|
||||
def test_unixtime(self):
|
||||
global TZ
|
||||
|
||||
@utils.timezone
|
||||
def _test_unixtime():
|
||||
epoch = common_utils.unixtime(dt)
|
||||
self.assertEqual(epoch, epoch_ans, "TZ=%s" % TZ)
|
||||
|
||||
dt = datetime.datetime(1970, 1, 2, 3, 4, 56, 0)
|
||||
epoch_ans = 56 + 4 * 60 + 3 * 3600 + 86400
|
||||
for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']:
|
||||
TZ = 'UTC' + d
|
||||
_test_unixtime()
|
||||
|
||||
def test_url_safe_check(self):
|
||||
base_str = 'i am safe'
|
||||
self.assertFalse(common_utils.is_not_url_safe(base_str))
|
||||
|
|
Loading…
Reference in New Issue