Merge "Process and validate auth methods against MFA rules"
This commit is contained in:
commit
e048d8895b
|
@ -12,4 +12,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# NOTE(notmorgan): Be careful in adjusting whitespace here, flake8 checks
|
||||
# get cranky.
|
||||
from keystone.auth import core # noqa
|
||||
|
||||
from keystone.auth import controllers # noqa
|
||||
|
|
|
@ -16,12 +16,10 @@ import sys
|
|||
|
||||
from keystoneclient.common import cms
|
||||
from oslo_log import log
|
||||
from oslo_log import versionutils
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
import six
|
||||
import stevedore
|
||||
|
||||
from keystone.auth import core
|
||||
from keystone.auth import schema
|
||||
from keystone.common import controller
|
||||
from keystone.common import dependency
|
||||
|
@ -39,54 +37,15 @@ LOG = log.getLogger(__name__)
|
|||
|
||||
CONF = keystone.conf.CONF
|
||||
|
||||
# registry of authentication methods
|
||||
AUTH_METHODS = {}
|
||||
AUTH_PLUGINS_LOADED = False
|
||||
# TODO(notmorgan): Update all references to the following functions to
|
||||
# reference auth.core instead of auth.controllers
|
||||
get_auth_method = core.get_auth_method
|
||||
load_auth_method = core.load_auth_method
|
||||
load_auth_methods = core.load_auth_methods
|
||||
|
||||
|
||||
def load_auth_method(method):
|
||||
plugin_name = CONF.auth.get(method) or 'default'
|
||||
namespace = 'keystone.auth.%s' % method
|
||||
try:
|
||||
driver_manager = stevedore.DriverManager(namespace, plugin_name,
|
||||
invoke_on_load=True)
|
||||
return driver_manager.driver
|
||||
except RuntimeError:
|
||||
LOG.debug('Failed to load the %s driver (%s) using stevedore, will '
|
||||
'attempt to load using import_object instead.',
|
||||
method, plugin_name)
|
||||
|
||||
driver = importutils.import_object(plugin_name)
|
||||
|
||||
msg = (_(
|
||||
'Direct import of auth plugin %(name)r is deprecated as of Liberty in '
|
||||
'favor of its entrypoint from %(namespace)r and may be removed in '
|
||||
'N.') %
|
||||
{'name': plugin_name, 'namespace': namespace})
|
||||
versionutils.report_deprecated_feature(LOG, msg)
|
||||
|
||||
return driver
|
||||
|
||||
|
||||
def load_auth_methods():
|
||||
global AUTH_PLUGINS_LOADED
|
||||
|
||||
if AUTH_PLUGINS_LOADED:
|
||||
# Only try and load methods a single time.
|
||||
return
|
||||
# config.setup_authentication should be idempotent, call it to ensure we
|
||||
# have setup all the appropriate configuration options we may need.
|
||||
keystone.conf.auth.setup_authentication()
|
||||
for plugin in set(CONF.auth.methods):
|
||||
AUTH_METHODS[plugin] = load_auth_method(plugin)
|
||||
AUTH_PLUGINS_LOADED = True
|
||||
|
||||
|
||||
def get_auth_method(method_name):
|
||||
global AUTH_METHODS
|
||||
if method_name not in AUTH_METHODS:
|
||||
raise exception.AuthMethodNotSupported()
|
||||
return AUTH_METHODS[method_name]
|
||||
# TODO(notmorgan): Move Common Auth Code (AuthContext and AuthInfo)
|
||||
# loading into keystone.auth.core (and update all references to the new
|
||||
# locations)
|
||||
|
||||
|
||||
class AuthContext(dict):
|
||||
|
@ -279,7 +238,7 @@ class AuthInfo(object):
|
|||
|
||||
# make sure auth method is supported
|
||||
for method_name in self.get_method_names():
|
||||
if method_name not in AUTH_METHODS:
|
||||
if method_name not in core.AUTH_METHODS:
|
||||
raise exception.AuthMethodNotSupported()
|
||||
|
||||
def _validate_and_normalize_auth_data(self, scope_only=False):
|
||||
|
@ -427,6 +386,7 @@ class Auth(controller.V3Controller):
|
|||
def __init__(self, *args, **kw):
|
||||
super(Auth, self).__init__(*args, **kw)
|
||||
keystone.conf.auth.setup_authentication()
|
||||
self._mfa_rules_validator = core.UserMFARulesValidator()
|
||||
|
||||
def authenticate_for_token(self, request, auth=None):
|
||||
"""Authenticate user and issue a token."""
|
||||
|
@ -447,10 +407,18 @@ class Auth(controller.V3Controller):
|
|||
|
||||
# NOTE(notmorgan): only methods that actually run and succeed will
|
||||
# be in the auth_context['method_names'] list. Do not blindly take
|
||||
# the values from auth_info, look at the authoritative values.
|
||||
method_names = auth_context.get('method_names', [])
|
||||
# make sure the list is unique
|
||||
method_names = list(set(method_names))
|
||||
# the values from auth_info, look at the authoritative values. Make
|
||||
# sure the set is unique.
|
||||
method_names_set = set(auth_context.get('method_names', []))
|
||||
method_names = list(method_names_set)
|
||||
|
||||
# Do MFA Rule Validation for the user
|
||||
if not self._mfa_rules_validator.check_auth_methods_against_rules(
|
||||
auth_context['user_id'], method_names_set):
|
||||
raise exception.InsufficientAuthMethods(
|
||||
user_id=auth_context['user_id'],
|
||||
methods='[%s]' % ','.join(auth_info.get_method_names()))
|
||||
|
||||
expires_at = auth_context.get('expires_at')
|
||||
token_audit_id = auth_context.get('audit_id')
|
||||
|
||||
|
|
|
@ -0,0 +1,196 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_log import versionutils
|
||||
from oslo_utils import importutils
|
||||
import six
|
||||
import stevedore
|
||||
|
||||
from keystone.common import dependency
|
||||
import keystone.conf
|
||||
from keystone import exception
|
||||
from keystone.i18n import _, _LI, _LE
|
||||
from keystone.identity.backends import resource_options as ro
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
|
||||
# registry of authentication methods
|
||||
AUTH_METHODS = {}
|
||||
AUTH_PLUGINS_LOADED = False
|
||||
|
||||
|
||||
def load_auth_method(method):
|
||||
plugin_name = CONF.auth.get(method) or 'default'
|
||||
namespace = 'keystone.auth.%s' % method
|
||||
try:
|
||||
driver_manager = stevedore.DriverManager(namespace, plugin_name,
|
||||
invoke_on_load=True)
|
||||
return driver_manager.driver
|
||||
except RuntimeError:
|
||||
LOG.debug('Failed to load the %s driver (%s) using stevedore, will '
|
||||
'attempt to load using import_object instead.',
|
||||
method, plugin_name)
|
||||
|
||||
driver = importutils.import_object(plugin_name)
|
||||
|
||||
msg = (_(
|
||||
'Direct import of auth plugin %(name)r is deprecated as of Liberty in '
|
||||
'favor of its entrypoint from %(namespace)r and may be removed in '
|
||||
'N.') %
|
||||
{'name': plugin_name, 'namespace': namespace})
|
||||
versionutils.report_deprecated_feature(LOG, msg)
|
||||
|
||||
return driver
|
||||
|
||||
|
||||
def load_auth_methods():
|
||||
global AUTH_PLUGINS_LOADED
|
||||
|
||||
if AUTH_PLUGINS_LOADED:
|
||||
# Only try and load methods a single time.
|
||||
return
|
||||
# config.setup_authentication should be idempotent, call it to ensure we
|
||||
# have setup all the appropriate configuration options we may need.
|
||||
keystone.conf.auth.setup_authentication()
|
||||
for plugin in set(CONF.auth.methods):
|
||||
AUTH_METHODS[plugin] = load_auth_method(plugin)
|
||||
AUTH_PLUGINS_LOADED = True
|
||||
|
||||
|
||||
def get_auth_method(method_name):
|
||||
global AUTH_METHODS
|
||||
if method_name not in AUTH_METHODS:
|
||||
raise exception.AuthMethodNotSupported()
|
||||
return AUTH_METHODS[method_name]
|
||||
|
||||
|
||||
@dependency.requires('identity_api')
|
||||
class UserMFARulesValidator(object):
|
||||
"""Helper object that can validate the MFA Rules."""
|
||||
|
||||
@property
|
||||
def _auth_methods(self):
|
||||
if AUTH_PLUGINS_LOADED:
|
||||
return set(AUTH_METHODS.keys())
|
||||
raise RuntimeError(_('Auth Method Plugins are not loaded.'))
|
||||
|
||||
def check_auth_methods_against_rules(self, user_id, auth_methods):
|
||||
"""Validate the MFA rules against the successful auth methods.
|
||||
|
||||
:param user_id: The user's ID (uuid).
|
||||
:type user_id: str
|
||||
:param auth_methods: List of methods that were used for auth
|
||||
:type auth_methods: set
|
||||
:returns: Boolean, ``True`` means rules match and auth may proceed,
|
||||
``False`` means rules do not match.
|
||||
"""
|
||||
user_ref = self.identity_api.get_user(user_id)
|
||||
mfa_rules = user_ref['options'].get(ro.MFA_RULES_OPT.option_name, [])
|
||||
mfa_rules_enabled = user_ref['options'].get(
|
||||
ro.MFA_ENABLED_OPT.option_name, True)
|
||||
rules = self._parse_rule_structure(mfa_rules, user_ref['id'])
|
||||
|
||||
if not rules or not mfa_rules_enabled:
|
||||
# return quickly if the rules are disabled for the user or not set
|
||||
LOG.debug('MFA Rules not processed for user `%(user_id)s`. '
|
||||
'Rule list: `%(rules)s` (Enabled: `%(enabled)s`).',
|
||||
{'user_id': user_id,
|
||||
'rules': mfa_rules,
|
||||
'enabled': mfa_rules_enabled})
|
||||
return True
|
||||
|
||||
for r in rules:
|
||||
# NOTE(notmorgan): We only check against the actually loaded
|
||||
# auth methods meaning that the keystone administrator may
|
||||
# disable an auth method, and a rule will still pass making it
|
||||
# impossible to accidently lock-out a subset of users with a
|
||||
# bad keystone.conf
|
||||
r_set = set(r).intersection(self._auth_methods)
|
||||
if set(auth_methods).issuperset(r_set):
|
||||
# Rule Matches no need to continue, return here.
|
||||
LOG.debug('Auth methods for user `%(user_id)s`, `%(methods)s` '
|
||||
'matched MFA rule `%(rule)s`. Loaded '
|
||||
'auth_methods: `%(loaded)s`',
|
||||
{'user_id': user_id,
|
||||
'rule': list(r_set),
|
||||
'methods': auth_methods,
|
||||
'loaded': self._auth_methods})
|
||||
return True
|
||||
|
||||
LOG.debug('Auth methods for user `%(user_id)s`, `%(methods)s` did not '
|
||||
'match a MFA rule in `%(rules)s`.',
|
||||
{'user_id': user_id,
|
||||
'methods': auth_methods,
|
||||
'rules': rules})
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _parse_rule_structure(rules, user_id):
|
||||
"""Validate and parse the rule data structure.
|
||||
|
||||
Rule sets must be in the form of list of lists. The lists may not
|
||||
have duplicates and must not be empty. The top-level list may be empty
|
||||
indicating that no rules exist.
|
||||
|
||||
:param rules: The list of rules from the user_ref
|
||||
:type rules: list
|
||||
:param user_id: the user_id, used for logging purposes
|
||||
:type user_id: str
|
||||
:returns: list of list, duplicates are stripped
|
||||
"""
|
||||
# NOTE(notmorgan): Most of this is done at the API request validation
|
||||
# and in the storage layer, it makes sense to also validate here and
|
||||
# ensure the data returned from the DB is sane, This will not raise
|
||||
# any exceptions, but just produce a usable set of data for rules
|
||||
# processing.
|
||||
rule_set = []
|
||||
found_rules = set()
|
||||
if not isinstance(rules, list):
|
||||
LOG.error(_LE('Corrupt rule data structure for user %(user_id)s, '
|
||||
'no rules loaded.'),
|
||||
{'user_id': user_id})
|
||||
return rule_set
|
||||
elif not rules:
|
||||
return rule_set
|
||||
|
||||
for r_list in rules:
|
||||
if not isinstance(r_list, list):
|
||||
LOG.info(_LI('Ignoring Rule %(rule)r; rule must be a list of '
|
||||
'strings.'),
|
||||
{'type': type(r_list)})
|
||||
continue
|
||||
|
||||
if r_list:
|
||||
# No empty rules are allowed.
|
||||
_ok_rule = True
|
||||
for item in r_list:
|
||||
if not isinstance(item, six.string_types):
|
||||
# Rules may only contain strings for method names
|
||||
# Reject a rule with non-string values
|
||||
LOG.info(_LI('Ignoring Rule %(rule)r; rule contains '
|
||||
'non-string values.'),
|
||||
{'rule': r_list})
|
||||
_ok_rule = False
|
||||
break
|
||||
if _ok_rule:
|
||||
# De-dupe rule and add to the return value
|
||||
rule_string = ';'.join(sorted(r_list))
|
||||
if rule_string not in found_rules:
|
||||
found_rules.add(rule_string)
|
||||
r_list = list(set(r_list))
|
||||
rule_set.append(r_list)
|
||||
|
||||
return rule_set
|
|
@ -253,6 +253,15 @@ class Unauthorized(SecurityError):
|
|||
title = 'Unauthorized'
|
||||
|
||||
|
||||
class InsufficientAuthMethods(Error):
|
||||
# NOTE(notmorgan): This is not a security error, this is meant to
|
||||
# communicate real information back to the user.
|
||||
message_format = _("Insufficient auth methods received for %(user_id)s. "
|
||||
"Auth Methods Provided: %(methods)s.")
|
||||
code = 401
|
||||
title = 'Unauthorized'
|
||||
|
||||
|
||||
class PasswordExpired(Unauthorized):
|
||||
message_format = _("The password is expired and needs to be changed for "
|
||||
"user: %(user_id)s.")
|
||||
|
|
|
@ -30,22 +30,22 @@ def _mfa_rules_validator_list_of_lists_of_strings_no_duplicates(value):
|
|||
'duplicated.')
|
||||
if not isinstance(value, list):
|
||||
raise TypeError(msg)
|
||||
sublist_set = set()
|
||||
sublists = []
|
||||
for item in value:
|
||||
string_set = set()
|
||||
if not isinstance(item, list):
|
||||
raise TypeError(msg)
|
||||
if not item:
|
||||
raise ValueError(msg)
|
||||
if item in sublist_set:
|
||||
if item in sublists:
|
||||
raise ValueError(msg)
|
||||
sublist_set.add(sublist_set)
|
||||
sublists.append(sublists)
|
||||
for element in item:
|
||||
if not isinstance(element, six.string_types):
|
||||
raise TypeError(msg)
|
||||
if element in string_set:
|
||||
raise ValueError(msg)
|
||||
sublist_set.add(element)
|
||||
string_set.add(element)
|
||||
|
||||
|
||||
USER_OPTIONS_REGISTRY = resource_options.ResourceOptionRegistry('USER')
|
||||
|
|
|
@ -48,19 +48,19 @@ class LoadAuthPlugins(fixtures.Fixture):
|
|||
def setUp(self):
|
||||
super(LoadAuthPlugins, self).setUp()
|
||||
|
||||
AUTH_METHODS = auth.controllers.AUTH_METHODS
|
||||
AUTH_METHODS = auth.core.AUTH_METHODS
|
||||
for method_name in self.method_names:
|
||||
if method_name in AUTH_METHODS:
|
||||
self.saved[method_name] = AUTH_METHODS[method_name]
|
||||
AUTH_METHODS[method_name] = auth.controllers.load_auth_method(
|
||||
AUTH_METHODS[method_name] = auth.core.load_auth_method(
|
||||
method_name)
|
||||
auth.controllers.AUTH_PLUGINS_LOADED = True
|
||||
auth.core.AUTH_PLUGINS_LOADED = True
|
||||
|
||||
def cleanUp(self):
|
||||
AUTH_METHODS = auth.controllers.AUTH_METHODS
|
||||
AUTH_METHODS = auth.core.AUTH_METHODS
|
||||
for method_name in list(AUTH_METHODS):
|
||||
if method_name in self.saved:
|
||||
AUTH_METHODS[method_name] = self.saved[method_name]
|
||||
else:
|
||||
del AUTH_METHODS[method_name]
|
||||
auth.controllers.AUTH_PLUGINS_LOADED = False
|
||||
auth.core.AUTH_PLUGINS_LOADED = False
|
||||
|
|
|
@ -51,5 +51,5 @@ class BackendLoader(fixtures.Fixture):
|
|||
del self._testcase # break circular reference
|
||||
|
||||
def clear_auth_plugin_registry(self):
|
||||
auth.controllers.AUTH_METHODS.clear()
|
||||
auth.controllers.AUTH_PLUGINS_LOADED = False
|
||||
auth.core.AUTH_METHODS.clear()
|
||||
auth.core.AUTH_PLUGINS_LOADED = False
|
||||
|
|
|
@ -115,7 +115,7 @@ class TestAuthPlugin(unit.SQLDriverOverrides, unit.TestCase):
|
|||
auth_plugins.ConfigAuthPlugins(self.config_fixture,
|
||||
['external', 'external']))
|
||||
auth.controllers.load_auth_methods()
|
||||
self.assertIn('external', auth.controllers.AUTH_METHODS)
|
||||
self.assertIn('external', auth.core.AUTH_METHODS)
|
||||
|
||||
|
||||
class TestAuthPluginDynamicOptions(TestAuthPlugin):
|
||||
|
|
|
@ -37,6 +37,7 @@ from keystone.common import utils
|
|||
import keystone.conf
|
||||
from keystone.credential.providers import fernet as credential_fernet
|
||||
from keystone import exception
|
||||
from keystone.identity.backends import resource_options as ro
|
||||
from keystone.policy.backends import rules
|
||||
from keystone.tests.common import auth as common_auth
|
||||
from keystone.tests import unit
|
||||
|
@ -47,6 +48,164 @@ from keystone.tests.unit import test_v3
|
|||
CONF = keystone.conf.CONF
|
||||
|
||||
|
||||
class TestMFARules(test_v3.RestfulTestCase, testcase.TestCase):
|
||||
def setUp(self):
|
||||
super(TestMFARules, self).setUp()
|
||||
auth.core.load_auth_methods()
|
||||
self.controller = auth.controllers.Auth()
|
||||
self.addCleanup(self.cleanup)
|
||||
|
||||
def cleanup(self):
|
||||
totp_creds = self.credential_api.list_credentials_for_user(
|
||||
self.user['id'], type='totp')
|
||||
|
||||
for cred in totp_creds:
|
||||
self.credential_api.delete_credential(cred['id'])
|
||||
|
||||
def auth_plugin_config_override(self, methods=None, **method_classes):
|
||||
methods = ['totp', 'token', 'password']
|
||||
super(TestMFARules, self).auth_plugin_config_override(methods)
|
||||
|
||||
def _update_user_with_MFA_rules(self, rule_list, rules_enabled=True):
|
||||
user = self.user.copy()
|
||||
# Do not update password
|
||||
user.pop('password')
|
||||
user['options'][ro.MFA_RULES_OPT.option_name] = rule_list
|
||||
user['options'][ro.MFA_ENABLED_OPT.option_name] = rules_enabled
|
||||
self.identity_api.update_user(user['id'], user)
|
||||
|
||||
def test_MFA_single_method_rules_requirements_met_succeeds(self):
|
||||
# ensure that a simple password works if a password-only rules exists
|
||||
rule_list = [['password'], ['password', 'totp']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id))
|
||||
|
||||
def test_MFA_multi_method_rules_requirements_met_succeeds(self):
|
||||
# validate that multiple auth-methods function if all are specified
|
||||
# and the rules requires it
|
||||
rule_list = [['password', 'totp']]
|
||||
totp_cred = unit.new_totp_credential(self.user_id, self.project_id)
|
||||
self.credential_api.create_credential(uuid.uuid4().hex, totp_cred)
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
auth_req = self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
passcode=totp._generate_totp_passcode(totp_cred['blob']))
|
||||
self.v3_create_token(auth_req)
|
||||
|
||||
def test_MFA_single_method_rules_requirements_not_met_fails(self):
|
||||
# if a rule matching a single auth type is specified and is not matched
|
||||
# the result should be unauthorized
|
||||
rule_list = [['totp']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id),
|
||||
expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def test_MFA_multi_method_rules_requirements_not_met_fails(self):
|
||||
# if multiple rules are specified and only one is passed,
|
||||
# unauthorized is expected
|
||||
rule_list = [['password', 'totp']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id),
|
||||
expected_status=http_client.UNAUTHORIZED)
|
||||
|
||||
def test_MFA_rules_bogus_non_existing_auth_method_succeeds(self):
|
||||
# Bogus auth methods are thrown out from rules.
|
||||
rule_list = [['password'], ['BoGusAuThMeTh0dHandl3r']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id))
|
||||
|
||||
def test_MFA_rules_disabled_MFA_succeeeds(self):
|
||||
# ensure that if MFA is "disableD" authentication succeeds, even if
|
||||
# not enough auth methods are specified
|
||||
rule_list = [['password', 'totp']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list,
|
||||
rules_enabled=False)
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id))
|
||||
|
||||
def test_MFA_rules_all_bogus_rules_results_in_default_behavior(self):
|
||||
# if all the rules are bogus, the result is the same as the default
|
||||
# behavior, any single password method is sufficient
|
||||
rule_list = [[uuid.uuid4().hex, uuid.uuid4().hex],
|
||||
['BoGus'],
|
||||
['NonExistantMethod']]
|
||||
self._update_user_with_MFA_rules(rule_list=rule_list)
|
||||
# NOTE(notmorgan): Step forward in time to ensure we're not causing
|
||||
# issues with revocation events that occur at the same time as the
|
||||
# token issuance. This is a bug with the limited resolution that
|
||||
# tokens and revocation events have.
|
||||
time = datetime.datetime.utcnow() + datetime.timedelta(seconds=5)
|
||||
with freezegun.freeze_time(time):
|
||||
self.v3_create_token(
|
||||
self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.user['password'],
|
||||
user_domain_id=self.domain_id,
|
||||
project_id=self.project_id))
|
||||
|
||||
|
||||
class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
|
||||
def setUp(self):
|
||||
super(TestAuthInfo, self).setUp()
|
||||
|
|
Loading…
Reference in New Issue