Additional changes to fix minor service support stuff and increase test coverage.

Also making validate token call available using service admin tokens

Change-Id: Ic558ed9cb2bd3e9dafb5ea0e47cd020938ca1e57
This commit is contained in:
Yogeshwar Srikrishnan 2011-08-15 03:25:27 -05:00 committed by Ziad Sawalha
parent 8f32ce329b
commit fabeec2505
9 changed files with 269 additions and 349 deletions

View File

@ -76,7 +76,10 @@ class RoleAPI(BaseLdapAPI, BaseTenantAPI):
roles = self.get_all('(serviceId=%s)' % \
(ldap.filter.escape_filter_chars(service_id),))
try:
return roles[0]
res = []
for role in roles:
res.append(role)
return res
except IndexError:
return None

View File

@ -205,7 +205,8 @@ class EndpointTemplateAPI(BaseEndpointTemplateAPI):
session = get_session()
with session.begin():
endpoints = self.endpoint_get(id, session)
session.delete(endpoints)
if endpoints:
session.delete(endpoints)
def get():

View File

@ -113,7 +113,7 @@ class IdentityService(object):
return self.__get_auth_data(dtoken, tenant_id)
def validate_token(self, admin_token, token_id, belongs_to=None):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
if not api.TOKEN.get(token_id):
raise fault.UnauthorizedFault("Bad token, please reauthenticate")
@ -523,18 +523,23 @@ class IdentityService(object):
if not isinstance(role, Role):
raise fault.BadRequestFault("Expecting a Role")
if role.role_id == None:
if role.role_id == None or len(role.role_id.strip()) == 0:
raise fault.BadRequestFault("Expecting a Role Id")
if api.ROLE.get(role.role_id) != None:
raise fault.RoleConflictFault(
"A role with that id already exists")
#Check if the passed service exist.
if role.service_id != None and len(role.service_id.strip()) > 0 and\
api.SERVICE.get(role.service_id) == None:
raise fault.BadRequestFault(
"A service with that id doesnt exist.")
"A role with that id '" + role.role_id + "' already exists")
#Check if the passed service exist
#and the role begins with service_id:.
if role.service_id != None and\
len(role.service_id.strip()) > 0:
if api.SERVICE.get(role.service_id) == None:
raise fault.BadRequestFault(
"A service with that id doesnt exist.")
if not role.role_id.startswith(role.service_id + ":"):
raise fault.BadRequestFault(
"Role should begin with service id '" +
role.service_id + ":'")
drole = models.Role()
drole.id = role.role_id
@ -770,7 +775,7 @@ class IdentityService(object):
return dendpoint
def delete_endpoint(self, admin_token, endpoint_id):
self.__validate_admin_token(admin_token)
self.__validate_service_or_keystone_admin_token(admin_token)
api.ENDPOINT_TEMPLATE.endpoint_delete(endpoint_id)
return None

View File

@ -39,6 +39,7 @@ class AuthenticationTest(unittest.TestCase):
utils.create_endpoint(self.tenant, "4", str(self.auth_token))
def tearDown(self):
utils.delete_all_endpoint(self.tenant, self.auth_token)
utils.delete_token(self.token, self.auth_token)
def test_a_authorize(self):

View File

@ -709,6 +709,15 @@ def create_endpoint_xml(tenant_id, endpoint_templates_id, auth_token):
return (resp, content)
def delete_endpoint(tenant, endpoint_id, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/endpoints/%s' % (URL_V2, tenant, endpoint_id)
resp, _content = header.request(url, "DELETE", body='', headers={
"Content-Type": "application/json",
"X-Auth-Token": str(auth_token)})
return (resp, _content)
def delete_all_endpoint(tenant_id, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/endpoints' % (URL_V2, tenant_id)
@ -731,11 +740,17 @@ def delete_all_endpoint(tenant_id, auth_token):
pass
else:
for endpoint in endpoints:
url = '%stenants/%s/endpoints/%s' % (
URL_V2, tenant_id, endpoint["id"])
header.request(url, "DELETE", body='', headers={
"Content-Type": "application/json",
"X-Auth-Token": str(auth_token)})
delete_endpoint(tenant_id, endpoint["id"], auth_token)
def delete_endpoint(tenant_id, endpoint_id, auth_token):
header = httplib2.Http(".cache")
url = '%stenants/%s/endpoints/%s' % (
URL_V2, tenant_id, endpoint_id)
resp, content = header.request(url, "DELETE", body='', headers={
"Content-Type": "application/json",
"X-Auth-Token": str(auth_token)})
return (resp, content)
def create_endpoint_template(region, service,

View File

@ -38,6 +38,7 @@ class EndpointTemplatesTest(unittest.TestCase):
self.user = utils.get_user()
self.userdisabled = utils.get_userdisabled()
self.auth_token = utils.get_auth_token()
self.service_token = utils.get_service_token()
self.exp_auth_token = utils.get_exp_auth_token()
self.disabled_token = utils.get_disabled_token()
self.missing_token = utils.get_none_token()
@ -46,6 +47,13 @@ class EndpointTemplatesTest(unittest.TestCase):
utils.create_user(self.tenant, self.user, self.auth_token)
self.token = utils.get_token(self.user, 'secrete', self.tenant,
'token')
self.region = 'DFW'
self.service = utils.get_test_service_id()
self.public_url = 'public'
self.admin_url = 'admin'
self.internal_url = 'internal'
self.enabled = True
self.is_global = False
def tearDown(self):
utils.delete_user(self.user, self.auth_token)
@ -55,20 +63,10 @@ class EndpointTemplatesTest(unittest.TestCase):
class CreateEndpointTemplatesTest(EndpointTemplatesTest):
def test_create_endpoint_template(self):
region = 'DFW'
service = utils.get_test_service_id()
public_url = 'public'
admin_url = 'admin'
internal_url = 'internal'
enabled = True
is_global = False
resp, content = utils.create_endpoint_template(\
region, service, public_url,\
admin_url, internal_url, enabled, is_global, self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.region, self.service, self.public_url,\
self.admin_url, self.internal_url,\
self.enabled, self.is_global, self.auth_token)
self.assertEqual(201, int(resp['status']))
obj = json.loads(content)
if not "endpointTemplate" in obj:
@ -88,10 +86,31 @@ class CreateEndpointTemplatesTest(EndpointTemplatesTest):
self.fail("Not the expected service")
resp, content = utils.delete_endpoint_template(
endpoint_template_id, self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(resp['status']))
def test_create_and_delete_endpoint_template_that_has_dependencies(self):
resp, content = utils.create_endpoint_template(\
self.region, self.service, self.public_url,\
self.admin_url, self.internal_url,\
self.enabled, self.is_global, self.auth_token)
self.assertEqual(201, int(resp['status']))
obj = json.loads(content)
if not "endpointTemplate" in obj:
raise fault.BadRequestFault("Expecting endpointTemplate")
endpoint_template = obj["endpointTemplate"]
if not "id" in endpoint_template:
endpoint_template_id = None
else:
endpoint_template_id = endpoint_template["id"]
if endpoint_template_id == None:
self.fail("Not the expected Endpoint Template")
resp, _content = utils.create_endpoint_xml(self.tenant,
endpoint_template_id,
str(self.auth_token))
resp_val = int(resp['status'])
self.assertEqual(201, resp_val)
resp, content = utils.delete_endpoint_template(
endpoint_template_id, self.auth_token)
self.assertEqual(204, int(resp['status']))
def test_create_endpoint_template_xml(self):
@ -103,13 +122,8 @@ class CreateEndpointTemplatesTest(EndpointTemplatesTest):
enabled = True
is_global = False
resp, content = utils.create_endpoint_template_xml(
region, service, public_url, admin_url,
internal_url, enabled, is_global, self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.region, self.service, self.public_url, self.admin_url,
self.internal_url, self.enabled, self.is_global, self.auth_token)
self.assertEqual(201, int(resp['status']))
#verify content
@ -129,10 +143,32 @@ class CreateEndpointTemplatesTest(EndpointTemplatesTest):
self.fail("Not the expected service")
resp, content = utils.delete_endpoint_template(
endpoint_template_id, self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(resp['status']))
def test_create_endpoint_template_using_service_admin_token(self):
resp, content = utils.create_endpoint_template(
self.region, self.service, self.public_url, self.admin_url,
self.internal_url, self.enabled, self.is_global,
self.service_token)
self.assertEqual(201, int(resp['status']))
obj = json.loads(content)
if not "endpointTemplate" in obj:
raise fault.BadRequestFault("Expecting endpointTemplate")
endpoint_template = obj["endpointTemplate"]
if not "id" in endpoint_template:
endpoint_template_id = None
else:
endpoint_template_id = endpoint_template["id"]
if endpoint_template_id == None:
self.fail("Not the expected Endpoint Template")
if not "serviceId" in endpoint_template:
service_id = None
else:
service_id = endpoint_template["serviceId"]
if service_id != utils.get_test_service_id():
self.fail("Not the expected service")
resp, content = utils.delete_endpoint_template(
endpoint_template_id, self.service_token)
self.assertEqual(204, int(resp['status']))
@ -144,10 +180,20 @@ class GetEndpointTemplatesTest(EndpointTemplatesTest):
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
obj = json.loads(content)
if not "endpointTemplates" in obj:
raise self.fail("Expecting endpointTemplates")
def test_get_endpoint_templates_using_service_admin_token(self):
header = httplib2.Http(".cache")
url = '%sendpointTemplates' % (utils.URL_V2)
#test for Content-Type = application/json
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.service_token})
self.assertEqual(200, int(resp['status']))
#verify content
@ -162,10 +208,6 @@ class GetEndpointTemplatesTest(EndpointTemplatesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_endpoint_templates_using_disabled_auth_token(self):
@ -175,10 +217,6 @@ class GetEndpointTemplatesTest(EndpointTemplatesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.disabled_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_endpoint_templates_using_missing_auth_token(self):
@ -188,10 +226,6 @@ class GetEndpointTemplatesTest(EndpointTemplatesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.missing_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_get_endpoint_templates_using_invalid_auth_token(self):
@ -201,10 +235,6 @@ class GetEndpointTemplatesTest(EndpointTemplatesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.invalid_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
def test_get_endpoint_templates_xml(self):
@ -215,10 +245,6 @@ class GetEndpointTemplatesTest(EndpointTemplatesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
@ -237,10 +263,6 @@ class GetEndpointTemplatesTest(EndpointTemplatesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_endpoint_templates_xml_disabled_auth_token(self):
@ -251,10 +273,6 @@ class GetEndpointTemplatesTest(EndpointTemplatesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.disabled_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_endpoint_templates_xml_missing_auth_token(self):
@ -265,10 +283,6 @@ class GetEndpointTemplatesTest(EndpointTemplatesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.missing_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_get_endpoint_templates_xml_invalid_auth_token(self):
@ -279,10 +293,6 @@ class GetEndpointTemplatesTest(EndpointTemplatesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.invalid_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
@ -294,12 +304,18 @@ class GetEndpointTemplateTest(EndpointTemplatesTest):
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
obj = json.loads(content)
if not "endpointTemplate" in obj:
raise self.fail("Expecting endpointTemplate")
def test_get_endpoint_using_service_admin_token(self):
header = httplib2.Http(".cache")
url = '%sendpointTemplates/%s' % (utils.URL_V2, '1')
#test for Content-Type = application/json
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.service_token})
#verify content
obj = json.loads(content)
if not "endpointTemplate" in obj:
@ -312,10 +328,6 @@ class GetEndpointTemplateTest(EndpointTemplatesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_endpoint_using_disabled_auth_token(self):
@ -325,10 +337,6 @@ class GetEndpointTemplateTest(EndpointTemplatesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.disabled_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_endpoint_using_missing_auth_token(self):
@ -338,10 +346,6 @@ class GetEndpointTemplateTest(EndpointTemplatesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.missing_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_get_endpoint_using_invalid_auth_token(self):
@ -351,10 +355,6 @@ class GetEndpointTemplateTest(EndpointTemplatesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.invalid_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
def test_get_endpoint_xml(self):
@ -365,10 +365,6 @@ class GetEndpointTemplateTest(EndpointTemplatesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
@ -409,17 +405,38 @@ class CreateEndpointRefsTest(EndpointTemplatesTest):
resp_val = int(resp['status'])
self.assertEqual(404, resp_val)
def test_endpoint_create_json(self):
_header = httplib2.Http(".cache")
utils.delete_endpoint(
self.tenant, "1", self.auth_token)
resp, _content = utils.create_endpoint(self.tenant, "1",
str(self.auth_token))
resp_val = int(resp['status'])
self.assertEqual(201, resp_val)
resp, _content = utils.delete_endpoint(
self.tenant, '1', self.auth_token)
resp_val = int(resp['status'])
self.assertEqual(204, resp_val)
def test_endpoint_create_using_service_admin_token(self):
_header = httplib2.Http(".cache")
resp, _content = utils.create_endpoint(self.tenant, "1",
str(self.service_token))
resp_val = int(resp['status'])
self.assertEqual(201, resp_val)
resp, _content = utils.delete_endpoint(
self.tenant, '1', self.service_token)
resp_val = int(resp['status'])
self.assertEqual(204, resp_val)
def test_endpoint_create_xml(self):
header = httplib2.Http(".cache")
resp, _content = utils.create_endpoint_xml(self.tenant, "1",
str(self.auth_token))
resp_val = int(resp['status'])
self.assertEqual(201, resp_val)
url = '%stenants/%s/endpoints/%s' % (URL_V2, self.tenant, '1')
resp, _content = header.request(url, "DELETE", body='', headers={
"Content-Type": "application/json",
"X-Auth-Token": str(self.auth_token)})
resp, _content = utils.delete_endpoint(
self.tenant, '1', self.auth_token)
resp_val = int(resp['status'])
self.assertEqual(204, resp_val)
@ -489,10 +506,6 @@ class GetEndPointTest(EndpointTemplatesTest):
"Content-Type": "application/xml",
"X-Auth-Token": str(self.auth_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
def test_get_endpoint_xml_using_expired_auth_token(self):
@ -503,10 +516,6 @@ class GetEndPointTest(EndpointTemplatesTest):
"Content-Type": "application/xml",
"X-Auth-Token": str(self.exp_auth_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_endpoint_xml_using_disabled_auth_token(self):
@ -517,10 +526,6 @@ class GetEndPointTest(EndpointTemplatesTest):
"Content-Type": "application/xml",
"X-Auth-Token": str(self.disabled_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_endpoint_xml_using_missing_auth_token(self):
@ -531,10 +536,6 @@ class GetEndPointTest(EndpointTemplatesTest):
"Content-Type": "application/xml",
"X-Auth-Token": str(self.missing_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_get_endpoint_xml_using_invalid_auth_token(self):
@ -545,10 +546,6 @@ class GetEndPointTest(EndpointTemplatesTest):
"Content-Type": "application/xml",
"X-Auth-Token": str(self.invalid_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
def test_get_endpoint_json(self):
@ -559,10 +556,19 @@ class GetEndPointTest(EndpointTemplatesTest):
"Content-Type": "application/json",
"X-Auth-Token": str(self.auth_token),
"ACCEPT": "application/json"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
obj = json.loads(content)
if not "endpoints" in obj:
raise self.fail("Expecting endpoints")
def test_get_endpoint_json(self):
header = httplib2.Http(".cache")
url = '%stenants/%s/endpoints' % (URL_V2, self.tenant)
#test for Content-Type = application/json
resp, content = header.request(url, "GET", body='{}', headers={
"Content-Type": "application/json",
"X-Auth-Token": str(self.service_token),
"ACCEPT": "application/json"})
self.assertEqual(200, int(resp['status']))
obj = json.loads(content)
if not "endpoints" in obj:
@ -576,10 +582,6 @@ class GetEndPointTest(EndpointTemplatesTest):
headers={"Content-Type": "application/json",
"X-Auth-Token": str(self.exp_auth_token),
"ACCEPT": "application/json"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
_obj = json.loads(content)
@ -591,10 +593,6 @@ class GetEndPointTest(EndpointTemplatesTest):
"Content-Type": "application/json",
"X-Auth-Token": str(self.disabled_token),
"ACCEPT": "application/json"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
_obj = json.loads(content)
@ -606,10 +604,6 @@ class GetEndPointTest(EndpointTemplatesTest):
"Content-Type": "application/json",
"X-Auth-Token": str(self.missing_token),
"ACCEPT": "application/json"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
_obj = json.loads(content)
@ -621,10 +615,6 @@ class GetEndPointTest(EndpointTemplatesTest):
"Content-Type": "application/json",
"X-Auth-Token": str(self.invalid_token),
"ACCEPT": "application/json"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
_obj = json.loads(content)

View File

@ -48,6 +48,7 @@ class RolesTest(unittest.TestCase):
self.token = utils.get_token(self.user, 'secrete', self.tenant,
'token')
self.service_id = utils.get_test_service_id()
self.service_role = self.service_id + ':test_role'
def tearDown(self):
utils.delete_user(self.user, self.auth_token)
@ -57,77 +58,50 @@ class RolesTest(unittest.TestCase):
class CreateRolesTest(RolesTest):
def test_create_role(self):
resp, content = utils.create_role('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(201, int(resp['status']))
resp, content = utils.delete_role('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(resp['status']))
def test_create_role_using_service_token(self):
resp, content = utils.create_role('test_role', self.service_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(201, int(resp['status']))
resp, content = utils.delete_role('test_role', self.service_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(resp['status']))
def test_create_roles_using_invalid_tokens(self):
resp, content = utils.create_role('test_role', self.disabled_token)
self.assertEqual(403, int(resp['status']))
resp, content = utils.create_role('test_role', self.missing_token)
self.assertEqual(401, int(resp['status']))
resp, content = utils.create_role('test_role', self.exp_auth_token)
self.assertEqual(403, int(resp['status']))
resp, content = utils.create_role('test_role', self.invalid_token)
self.assertEqual(404, int(resp['status']))
def test_delete_roles_using_invalid_tokens(self):
resp, content = utils.delete_role('test_role', self.disabled_token)
self.assertEqual(403, int(resp['status']))
resp, content = utils.delete_role('test_role', self.missing_token)
self.assertEqual(401, int(resp['status']))
resp, content = utils.delete_role('test_role', self.exp_auth_token)
self.assertEqual(403, int(resp['status']))
resp, content = utils.delete_role('test_role', self.invalid_token)
self.assertEqual(404, int(resp['status']))
def test_create_and_delete_role_that_has_references(self):
resp, content = utils.create_role('test_role', self.auth_token)
self.assertEqual(201, int(resp['status']))
utils.create_role_ref(
self.user, "test_role",
self.tenant, self.auth_token)
resp, content = utils.delete_role('test_role', self.auth_token)
self.assertEqual(204, int(resp['status']))
def test_create_role_mapped_to_a_service(self):
resp, content = utils.create_role_mapped_to_service(
'test_role', self.auth_token, self.service_id)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.service_role, self.auth_token, self.service_id)
self.assertEqual(201, int(resp['status']))
resp, content = utils.get_role('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
resp, content = utils.get_role('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
resp, content = utils.get_role(self.service_role, self.auth_token)
self.assertEqual(200, int(resp['status']))
#verify content
obj = json.loads(content)
@ -138,7 +112,8 @@ class CreateRolesTest(RolesTest):
role_id = None
else:
role_id = role["id"]
if role_id != 'test_role':
if role_id != self.service_role:
self.fail("Not the expected Role")
if not "serviceId" in role:
service_id = None
@ -146,26 +121,14 @@ class CreateRolesTest(RolesTest):
service_id = role["serviceId"]
if service_id != self.service_id:
self.fail("Not the expected service")
resp, content = utils.delete_role('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
resp, content = utils.delete_role(self.service_role, self.auth_token)
self.assertEqual(204, int(resp['status']))
def test_create_role_mapped_to_a_service_xml(self):
resp, content = utils.create_role_mapped_to_service_xml(
'test_role', self.auth_token, self.service_id)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.service_role, self.auth_token, self.service_id)
self.assertEqual(201, int(resp['status']))
resp, content = utils.get_role_xml('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
resp, content = utils.get_role_xml(self.service_role, self.auth_token)
self.assertEqual(200, int(resp['status']))
#verify content
@ -176,19 +139,20 @@ class CreateRolesTest(RolesTest):
if role == None:
self.fail("Expecting Role")
role_id = role.get("id")
if role_id != 'test_role':
if role_id != self.service_role:
self.fail("Not the expected Role")
service_id = role.get("serviceId")
if service_id != self.service_id:
self.fail("Not the expected service")
resp, content = utils.delete_role('test_role', self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
resp, content = utils.delete_role(self.service_role, self.auth_token)
self.assertEqual(204, int(resp['status']))
def test_create_role_mapped_to_a_service_using_incorrect_role_name(self):
resp, content = utils.create_role_mapped_to_service(
'test_role', self.auth_token, self.service_id)
self.assertEqual(400, int(resp['status']))
class GetRolesTest(RolesTest):
def test_get_roles(self):
@ -198,10 +162,6 @@ class GetRolesTest(RolesTest):
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
@ -227,10 +187,6 @@ class GetRolesTest(RolesTest):
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.service_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
@ -257,10 +213,6 @@ class GetRolesTest(RolesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
# Validate Returned Content
dom = etree.Element("root")
@ -285,10 +237,6 @@ class GetRolesTest(RolesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_roles_exp_token_xml(self):
@ -299,10 +247,6 @@ class GetRolesTest(RolesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
@ -317,10 +261,6 @@ class GetRoleTest(RolesTest):
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
obj = json.loads(content)
@ -342,10 +282,6 @@ class GetRoleTest(RolesTest):
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.service_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
obj = json.loads(content)
@ -368,10 +304,6 @@ class GetRoleTest(RolesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
@ -392,10 +324,6 @@ class GetRoleTest(RolesTest):
resp, _content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
def test_get_role_xml_bad(self):
@ -405,10 +333,6 @@ class GetRoleTest(RolesTest):
resp, _content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
def test_get_role_expired_token(self):
@ -419,10 +343,6 @@ class GetRoleTest(RolesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_role_xml_using_expired_token(self):
@ -434,10 +354,6 @@ class GetRoleTest(RolesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_role_using_disabled_token(self):
@ -463,10 +379,6 @@ class GetRoleTest(RolesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.disabled_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_role_using_missing_token(self):
@ -477,10 +389,6 @@ class GetRoleTest(RolesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.missing_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_get_role_xml_using_missing_token(self):
@ -492,10 +400,6 @@ class GetRoleTest(RolesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.missing_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_get_role_using_invalid_token(self):
@ -506,10 +410,6 @@ class GetRoleTest(RolesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.invalid_token})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
def test_get_role_xml_using_invalid_token(self):
@ -521,10 +421,6 @@ class GetRoleTest(RolesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.invalid_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
@ -592,10 +488,6 @@ class GetRoleRefsTest(RolesTest):
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": str(self.auth_token)})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
@ -615,10 +507,6 @@ class GetRoleRefsTest(RolesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": str(self.auth_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
dom = etree.Element("root")
@ -636,14 +524,10 @@ class GetRoleRefsTest(RolesTest):
str(self.auth_token))
url = '%susers/%s/roleRefs' % (URL_V2, self.user)
#test for Content-Type = application/xml
resp, content = header.request(url, "GET", body='{}', headers={
"Content-Type": "application/xml",
resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/xml",
"X-Auth-Token": str(self.service_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
dom = etree.Element("root")
@ -664,10 +548,6 @@ class GetRoleRefsTest(RolesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": str(self.exp_auth_token)})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_rolerefs_xml_using_expired_token(self):
@ -682,10 +562,6 @@ class GetRoleRefsTest(RolesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": str(self.exp_auth_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_rolerefs_using_disabled_token(self):
@ -698,10 +574,6 @@ class GetRoleRefsTest(RolesTest):
resp, _content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": str(self.disabled_token)})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_rolerefs_xml_using_disabled_token(self):
@ -716,10 +588,6 @@ class GetRoleRefsTest(RolesTest):
headers={"Content-Type": "application/xml",
"X-Auth-Token": str(self.disabled_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
def test_get_rolerefs_using_missing_token(self):
@ -732,10 +600,6 @@ class GetRoleRefsTest(RolesTest):
resp, _content = header.request(url, "GET", body='{}', headers={
"Content-Type": "application/json",
"X-Auth-Token": str(self.missing_token)})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_get_rolerefs_xml_using_missing_token(self):
@ -749,10 +613,6 @@ class GetRoleRefsTest(RolesTest):
"Content-Type": "application/xml",
"X-Auth-Token": str(self.missing_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
def test_get_rolerefs_json_using_invalid_token(self):
@ -765,10 +625,6 @@ class GetRoleRefsTest(RolesTest):
resp, _content = header.request(url, "GET", body='{}', headers={
"Content-Type": "application/json",
"X-Auth-Token": str(self.invalid_token)})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
def test_get_rolerefs_xml_using_invalid_token(self):
@ -783,10 +639,6 @@ class GetRoleRefsTest(RolesTest):
"Content-Type": "application/xml",
"X-Auth-Token": str(self.invalid_token),
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))

View File

@ -57,10 +57,6 @@ class ServicesTest(unittest.TestCase):
class GetServicesTest(ServicesTest):
def test_get_services_using_keystone_admin_token_json(self):
resp, content = utils.get_services(self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
obj = json.loads(content)
@ -78,12 +74,7 @@ class GetServicesTest(ServicesTest):
def test_get_services_using_keystone_admin_token_xml(self):
resp, content = utils.get_services_xml(self.auth_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
# verify content
# Validate Returned Content
dom = etree.Element("root")
@ -106,10 +97,6 @@ class GetServicesTest(ServicesTest):
def test_get_services_using_service_admin_token(self):
resp, content = utils.get_services(self.service_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
#verify content
obj = json.loads(content)
@ -127,10 +114,6 @@ class GetServicesTest(ServicesTest):
def test_get_services_using_service_admin_token_xml(self):
resp, content = utils.get_services_xml(self.service_token)
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
# Verify content
# Validate Returned Content
@ -155,13 +138,10 @@ class GetServicesTest(ServicesTest):
def test_get_services_using_invalid_tokens(self):
resp, content = utils.get_services(self.disabled_token)
self.assertEqual(403, int(resp['status']))
resp, content = utils.get_services(self.missing_token)
self.assertEqual(401, int(resp['status']))
resp, content = utils.get_services(self.exp_auth_token)
self.assertEqual(403, int(resp['status']))
resp, content = utils.get_services(self.invalid_token)
self.assertEqual(404, int(resp['status']))
@ -203,6 +183,20 @@ class GetServiceTest(ServicesTest):
resp_val = int(resp['status'])
self.assertEqual(404, resp_val)
def test_get_service_using_invalid_tokens(self):
resp, content = utils.get_service(
self.sample_service, self.disabled_token)
self.assertEqual(403, int(resp['status']))
resp, content = utils.get_service(
self.sample_service, self.missing_token)
self.assertEqual(401, int(resp['status']))
resp, content = utils.get_service(
self.sample_service, self.exp_auth_token)
self.assertEqual(403, int(resp['status']))
resp, content = utils.get_service(
self.sample_service, self.invalid_token)
self.assertEqual(404, int(resp['status']))
def test_get_service_using_invalid_tokens(self):
resp, content = utils.get_service(
self.sample_service, self.disabled_token)
@ -278,6 +272,42 @@ class DeleteServiceTest(ServicesTest):
resp_val = int(resp['status'])
self.assertEqual(204, resp_val)
def test_service_which_has_dependencies_delete(self):
resp, _content = utils.create_service(
self.test_service, str(self.auth_token))
resp, _content = utils.create_role_mapped_to_service(
self.test_service + ":test_role",
self.auth_token, self.test_service)
self.assertEqual(201, int(resp['status']))
resp, _content = utils.create_role_ref(
self.user, self.test_service +
":test_role", self.tenant, self.auth_token)
self.assertEqual(201, int(resp['status']))
resp, _content = utils.create_endpoint_template(\
'DFW', self.test_service, 'public_url',\
'admin_url', 'internal_url', True, False, self.auth_token)
self.assertEqual(201, int(resp['status']))
#verify content
obj = json.loads(_content)
if not "endpointTemplate" in obj:
raise fault.BadRequestFault("Expecting endpointTemplate")
endpoint_template = obj["endpointTemplate"]
if not "id" in endpoint_template:
endpoint_template_id = None
else:
endpoint_template_id = endpoint_template["id"]
resp, _content = utils.create_endpoint_xml(self.tenant,
endpoint_template_id,
str(self.auth_token))
resp_val = int(resp['status'])
self.assertEqual(201, resp_val)
resp, _content = utils.delete_service(
self.test_service, self.auth_token)
resp_val = int(resp['status'])
self.assertEqual(204, resp_val)
def test_service_delete_json_using_expired_token(self):
resp, _content = utils.delete_service(
self.test_service, str(self.exp_auth_token))

View File

@ -76,6 +76,29 @@ class ValidateToken(unittest.TestCase):
role_ref_id = role_ref["id"]
self.assertEqual(self.role_ref_id, role_ref_id)
def test_validate_token_true_using_service_token(self):
header = httplib2.Http(".cache")
url = '%stokens/%s?belongsTo=%s' % (utils.URL_V2, self.token,
self.tenant)
resp, content = header.request(url, "GET", body='', headers={
"Content-Type": "application/json",
"X-Auth-Token": utils.get_service_token()})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
self.assertEqual('application/json', utils.content_type(resp))
#verify content
obj = json.loads(content)
if not "auth" in obj:
raise self.fail("Expecting Auth")
role_refs = obj["auth"]["user"]["roleRefs"]
role_ref = role_refs[0]
role_ref_id = role_ref["id"]
self.assertEqual(self.role_ref_id, role_ref_id)
def test_validate_token_true_xml(self):
header = httplib2.Http(".cache")
url = '%stokens/%s?belongsTo=%s' % (