add cadf notifications for oauth

Adds support for cadf notifications for create/update/delete of
consumers, request tokens and access tokens.

Change-Id: Iaa8e148cde3af881c8b573ab9d27bf366134c865
This commit is contained in:
Steve Martinelli 2015-02-25 04:03:33 -05:00
parent fb838489a4
commit 969f72b692
4 changed files with 170 additions and 26 deletions

View File

@ -59,7 +59,8 @@ class ConsumerCrudV3(controller.V3Controller):
@controller.protected()
def create_consumer(self, context, consumer):
ref = self._assign_unique_id(self._normalize_dict(consumer))
consumer_ref = self.oauth_api.create_consumer(ref)
initiator = notifications._get_request_audit_info(context)
consumer_ref = self.oauth_api.create_consumer(ref, initiator)
return ConsumerCrudV3.wrap_member(context, consumer_ref)
@controller.protected()
@ -67,7 +68,8 @@ class ConsumerCrudV3(controller.V3Controller):
self._require_matching_id(consumer_id, consumer)
ref = self._normalize_dict(consumer)
self._validate_consumer_ref(ref)
ref = self.oauth_api.update_consumer(consumer_id, ref)
initiator = notifications._get_request_audit_info(context)
ref = self.oauth_api.update_consumer(consumer_id, ref, initiator)
return ConsumerCrudV3.wrap_member(context, ref)
@controller.protected()
@ -89,7 +91,8 @@ class ConsumerCrudV3(controller.V3Controller):
payload = {'user_id': user_token_ref.user_id,
'consumer_id': consumer_id}
_emit_user_oauth_consumer_token_invalidate(payload)
self.oauth_api.delete_consumer(consumer_id)
initiator = notifications._get_request_audit_info(context)
self.oauth_api.delete_consumer(consumer_id, initiator)
def _validate_consumer_ref(self, consumer):
if 'secret' in consumer:
@ -138,8 +141,9 @@ class AccessTokenCrudV3(controller.V3Controller):
consumer_id = access_token['consumer_id']
payload = {'user_id': user_id, 'consumer_id': consumer_id}
_emit_user_oauth_consumer_token_invalidate(payload)
initiator = notifications._get_request_audit_info(context)
return self.oauth_api.delete_access_token(
user_id, access_token_id)
user_id, access_token_id, initiator)
@staticmethod
def _get_user_id(entity):
@ -245,9 +249,11 @@ class OAuthControllerV3(controller.V3Controller):
raise exception.Unauthorized(message=msg)
request_token_duration = CONF.oauth1.request_token_duration
initiator = notifications._get_request_audit_info(context)
token_ref = self.oauth_api.create_request_token(consumer_id,
requested_project_id,
request_token_duration)
request_token_duration,
initiator)
result = ('oauth_token=%(key)s&oauth_token_secret=%(secret)s'
% {'key': token_ref['id'],
@ -324,8 +330,10 @@ class OAuthControllerV3(controller.V3Controller):
raise exception.Unauthorized(message=msg)
access_token_duration = CONF.oauth1.access_token_duration
initiator = notifications._get_request_audit_info(context)
token_ref = self.oauth_api.create_access_token(request_token_id,
access_token_duration)
access_token_duration,
initiator)
result = ('oauth_token=%(key)s&oauth_token_secret=%(secret)s'
% {'key': token_ref['id'],

View File

@ -158,32 +158,41 @@ class Manager(manager.Manager):
def __init__(self):
super(Manager, self).__init__(CONF.oauth1.driver)
@notifications.created(_CONSUMER)
def create_consumer(self, consumer_ref):
return self.driver.create_consumer(consumer_ref)
def create_consumer(self, consumer_ref, initiator=None):
ret = self.driver.create_consumer(consumer_ref)
notifications.Audit.created(self._CONSUMER, ret['id'], initiator)
return ret
@notifications.updated(_CONSUMER)
def update_consumer(self, consumer_id, consumer_ref):
return self.driver.update_consumer(consumer_id, consumer_ref)
def update_consumer(self, consumer_id, consumer_ref, initiator=None):
ret = self.driver.update_consumer(consumer_id, consumer_ref)
notifications.Audit.updated(self._CONSUMER, consumer_id, initiator)
return ret
@notifications.deleted(_CONSUMER)
def delete_consumer(self, consumer_id):
return self.driver.delete_consumer(consumer_id)
def delete_consumer(self, consumer_id, initiator=None):
ret = self.driver.delete_consumer(consumer_id)
notifications.Audit.deleted(self._CONSUMER, consumer_id, initiator)
return ret
@notifications.created(_ACCESS_TOKEN)
def create_access_token(self, request_id, access_token_duration):
return self.driver.create_access_token(request_id,
access_token_duration)
def create_access_token(self, request_id, access_token_duration,
initiator=None):
ret = self.driver.create_access_token(request_id,
access_token_duration)
notifications.Audit.created(self._ACCESS_TOKEN, ret['id'], initiator)
return ret
@notifications.deleted(_ACCESS_TOKEN, resource_id_arg_index=2)
def delete_access_token(self, user_id, access_token_id):
return self.driver.delete_access_token(user_id, access_token_id)
def delete_access_token(self, user_id, access_token_id, initiator=None):
ret = self.driver.delete_access_token(user_id, access_token_id)
notifications.Audit.deleted(self._ACCESS_TOKEN, access_token_id,
initiator)
return ret
@notifications.created(_REQUEST_TOKEN, resource_id_arg_index=2)
def create_request_token(self, consumer_id, requested_project,
request_token_duration):
return self.driver.create_request_token(
request_token_duration, initiator=None):
ret = self.driver.create_request_token(
consumer_id, requested_project, request_token_duration)
notifications.Audit.created(self._REQUEST_TOKEN, ret['id'],
initiator)
return ret
@six.add_metaclass(abc.ABCMeta)

View File

@ -66,7 +66,10 @@ CADF_TYPE_MAP = {
'endpoint': taxonomy.SECURITY_ENDPOINT,
'service': taxonomy.SECURITY_SERVICE,
'policy': taxonomy.SECURITY_POLICY,
'OS-TRUST:trust': taxonomy.SECURITY_TRUST
'OS-TRUST:trust': taxonomy.SECURITY_TRUST,
'OS-OAUTH1:access_token': taxonomy.SECURITY_CREDENTIAL,
'OS-OAUTH1:request_token': taxonomy.SECURITY_CREDENTIAL,
'OS-OAUTH1:consumer': taxonomy.SECURITY_ACCOUNT,
}
SAML_AUDIT_TYPE = 'http://docs.oasis-open.org/security/saml/v2.0'

View File

@ -17,12 +17,14 @@ import uuid
from oslo_config import cfg
from oslo_serialization import jsonutils
from pycadf import cadftaxonomy
from six.moves import urllib
from keystone.contrib import oauth1
from keystone.contrib.oauth1 import controllers
from keystone.contrib.oauth1 import core
from keystone import exception
from keystone.tests.unit.common import test_notifications
from keystone.tests.unit.ksfixtures import temporaryfile
from keystone.tests.unit import test_v3
@ -758,6 +760,128 @@ class MaliciousOAuth1Tests(OAuth1Tests):
self.post(endpoint, headers=headers, expected_status=500)
class OAuthNotificationTests(OAuth1Tests,
test_notifications.BaseNotificationTest):
def test_create_consumer(self):
consumer_ref = self._create_single_consumer()
self._assert_notify_sent(consumer_ref['id'],
test_notifications.CREATED_OPERATION,
'OS-OAUTH1:consumer')
self._assert_last_audit(consumer_ref['id'],
test_notifications.CREATED_OPERATION,
'OS-OAUTH1:consumer',
cadftaxonomy.SECURITY_ACCOUNT)
def test_update_consumer(self):
consumer_ref = self._create_single_consumer()
update_ref = {'consumer': {'description': uuid.uuid4().hex}}
self.oauth_api.update_consumer(consumer_ref['id'], update_ref)
self._assert_notify_sent(consumer_ref['id'],
test_notifications.UPDATED_OPERATION,
'OS-OAUTH1:consumer')
self._assert_last_audit(consumer_ref['id'],
test_notifications.UPDATED_OPERATION,
'OS-OAUTH1:consumer',
cadftaxonomy.SECURITY_ACCOUNT)
def test_delete_consumer(self):
consumer_ref = self._create_single_consumer()
self.oauth_api.delete_consumer(consumer_ref['id'])
self._assert_notify_sent(consumer_ref['id'],
test_notifications.DELETED_OPERATION,
'OS-OAUTH1:consumer')
self._assert_last_audit(consumer_ref['id'],
test_notifications.DELETED_OPERATION,
'OS-OAUTH1:consumer',
cadftaxonomy.SECURITY_ACCOUNT)
def test_oauth_flow_notifications(self):
"""Test to ensure notifications are sent for oauth tokens
This test is very similar to test_oauth_flow, however
there are additional checks in this test for ensuring that
notifications for request token creation, and access token
creation/deletion are emitted.
"""
consumer = self._create_single_consumer()
consumer_id = consumer['id']
consumer_secret = consumer['secret']
self.consumer = {'key': consumer_id, 'secret': consumer_secret}
self.assertIsNotNone(self.consumer['secret'])
url, headers = self._create_request_token(self.consumer,
self.project_id)
content = self.post(
url, headers=headers,
response_content_type='application/x-www-urlformencoded')
credentials = urllib.parse.parse_qs(content.result)
request_key = credentials['oauth_token'][0]
request_secret = credentials['oauth_token_secret'][0]
self.request_token = oauth1.Token(request_key, request_secret)
self.assertIsNotNone(self.request_token.key)
# Test to ensure the create request token notification is sent
self._assert_notify_sent(request_key,
test_notifications.CREATED_OPERATION,
'OS-OAUTH1:request_token')
self._assert_last_audit(request_key,
test_notifications.CREATED_OPERATION,
'OS-OAUTH1:request_token',
cadftaxonomy.SECURITY_CREDENTIAL)
url = self._authorize_request_token(request_key)
body = {'roles': [{'id': self.role_id}]}
resp = self.put(url, body=body, expected_status=200)
self.verifier = resp.result['token']['oauth_verifier']
self.assertTrue(all(i in core.VERIFIER_CHARS for i in self.verifier))
self.assertEqual(8, len(self.verifier))
self.request_token.set_verifier(self.verifier)
url, headers = self._create_access_token(self.consumer,
self.request_token)
content = self.post(
url, headers=headers,
response_content_type='application/x-www-urlformencoded')
credentials = urllib.parse.parse_qs(content.result)
access_key = credentials['oauth_token'][0]
access_secret = credentials['oauth_token_secret'][0]
self.access_token = oauth1.Token(access_key, access_secret)
self.assertIsNotNone(self.access_token.key)
# Test to ensure the create access token notification is sent
self._assert_notify_sent(access_key,
test_notifications.CREATED_OPERATION,
'OS-OAUTH1:access_token')
self._assert_last_audit(access_key,
test_notifications.CREATED_OPERATION,
'OS-OAUTH1:access_token',
cadftaxonomy.SECURITY_CREDENTIAL)
resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
% {'user': self.user_id,
'auth': self.access_token.key})
self.assertResponseStatus(resp, 204)
# Test to ensure the delete access token notification is sent
self._assert_notify_sent(access_key,
test_notifications.DELETED_OPERATION,
'OS-OAUTH1:access_token')
self._assert_last_audit(access_key,
test_notifications.DELETED_OPERATION,
'OS-OAUTH1:access_token',
cadftaxonomy.SECURITY_CREDENTIAL)
class OAuthCADFNotificationTests(OAuthNotificationTests):
def setUp(self):
"""Repeat the tests for CADF notifications """
super(OAuthCADFNotificationTests, self).setUp()
self.config_fixture.config(notification_format='cadf')
class JsonHomeTests(OAuth1Tests, test_v3.JsonHomeTestMixin):
JSON_HOME_DATA = {
'http://docs.openstack.org/api/openstack-identity/3/ext/OS-OAUTH1/1.0/'