diff --git a/swauth/middleware.py b/swauth/middleware.py index f451197..7955b8e 100644 --- a/swauth/middleware.py +++ b/swauth/middleware.py @@ -1521,14 +1521,21 @@ class Swauth(object): def credentials_match(self, user_detail, key): """Returns True if the key is valid for the user_detail. - It will use self.auth_encoder to check for a key match. + It will use auth_encoder type the password was encoded with, + to check for a key match. :param user_detail: The dict for the user. :param key: The key to validate for the user. :returns: True if the key is valid for the user, False if not. """ - return user_detail and self.auth_encoder().match( - key, user_detail.get('auth')) + if user_detail: + creds = user_detail.get('auth') + auth_type = creds.split(':')[0] + auth_encoder = getattr(swauth.authtypes, auth_type.title(), None) + if auth_encoder is None: + self.logger.error('Invalid auth_type %s' % auth_type) + return False + return user_detail and auth_encoder().match(key, creds) def is_user_changing_own_key(self, req, user): """Check if the user is changing his own key. diff --git a/test/unit/test_middleware.py b/test/unit/test_middleware.py index bda53d7..60f40a5 100644 --- a/test/unit/test_middleware.py +++ b/test/unit/test_middleware.py @@ -238,6 +238,31 @@ class TestAuth(unittest.TestCase): self.assertEqual(ath.dsc_url, 'https://host/path') self.assertEqual(ath.dsc_url2, 'http://host2/path2') + def test_credentials_match_auth_encoder_type(self): + plaintext_auth = {'auth': 'plaintext:key'} + sha1_key = ("sha1:T0YFdhqN4uDRWiYLxWa7H2T8AewG4fEYQyJFRLsgcfk=$46c58" + "07eb8a32e8f404fea9eaaeb60b7e1207ff1") + sha1_auth = {'auth': sha1_key} + sha512_key = ("sha512:aSm0jEeqIp46T5YLZy1r8+cXs/Xzs1S4VUwVauhBs44=$ef" + "7332ec1288bf69c75682eb8d459d5a84baa7e43f45949c242a9af9" + "7130ef16ac361fe1aa33a789e218122b83c54ef1923fc015080741" + "ca21f6187329f6cb7a") + sha512_auth = {'auth': sha512_key} + + # test all possible config settings work with all possible auth types + for auth_type in ('plaintext', 'sha1', 'sha512'): + test_auth = auth.filter_factory({'super_admin_key': 'superkey', + 'auth_type': auth_type})(FakeApp()) + for detail in (plaintext_auth, sha1_auth, sha512_auth): + self.assertTrue(test_auth.credentials_match(detail, 'key')) + # test invalid auth type stored + invalid_detail = {'auth': 'Junk:key'} + test_auth.logger = mock.Mock() + self.assertFalse(test_auth.credentials_match(invalid_detail, + 'key')) + # make sure error is logged + test_auth.logger.called_once_with('Invalid auth_type Junk') + def test_top_level_denied(self): resp = Request.blank('/').get_response(self.test_auth) self.assertEqual(resp.status_int, 401)