Add Multi-user support for Functional Tests

This commit adds the infrastructure for multi user testing.  It also adds
a small set of test cases that exercises RBAC policy for secret and
container reads.

Six users with four roles and two projects are added.
   In Project A: admin_a, creator_a, observer_a, auditor_a
   In Project B: admin_b, observer_b

Get Secrets and Get Containers are tested for each user.

Implements: blueprint add-run-as-for-functional-tests
Relates-to: blueprint multi-user-functional-tests

Change-Id: I65c820440c014301cfce90d360440d3e12e7ffba
This commit is contained in:
Dave McCowan 2015-04-21 17:56:17 -04:00
parent f39eb03916
commit 28135c1099
11 changed files with 500 additions and 69 deletions

View File

@ -20,8 +20,8 @@ export OS_USERNAME="nova"
#export OS_SERVICE_ENDPOINT=http://localhost:35357/v2.0
# ========================================
echo " OS_SERVICE_ENDPOINT="$OS_SEVICE_ENDPOINT
echo " SERVICE_TOKEN="$SERVICE_TOKEN
echo " OS_SERVICE_ENDPOINT="$OS_SERVICE_ENDPOINT
echo " SERVICE_TOKEN="$OS_SERVICE_TOKEN
echo " OS_TENANT_NAME="$OS_TENANT_NAME
echo " OS_USERNAME="$OS_USERNAME
echo " OS_PASSWORD="$OS_PASSWORD
@ -30,7 +30,7 @@ echo " OS_AUTH_URL="$OS_AUTH_URL
#test with
keystone tenant-list
function get_id () {
function get_id {
echo `"$@" | awk '/ id / { print $4 }'`
}
@ -46,22 +46,109 @@ KEYSTONE_CATALOG_BACKEND='sql'
#============================
# Lookups
SERVICE_TENANT=$(keystone tenant-list | awk "/ $SERVICE_TENANT_NAME / { print \$2 }")
SERVICE_TENANT=$(get_id keystone tenant-create --name="$SERVICE_TENANT_NAME")
ADMIN_ROLE=$(keystone role-list | awk "/ admin / { print \$2 }")
MEMBER_ROLE=$(keystone role-list | awk "/ Member / { print \$2 }")
MEMBER_ROLE=$(keystone role-list | awk "/ _member_ / { print \$2 }")
# Ports to avoid: 3333, 5000, 8773, 8774, 8776, 9292, 9696, 35357
# Barbican
if [[ "$ENABLED_SERVICES" =~ "barbican" ]]; then
#
# Setup Default Admin User
#
BARBICAN_USER=$(get_id keystone user-create \
--name=barbican \
--name="barbican" \
--pass="$SERVICE_PASSWORD" \
--tenant_id $SERVICE_TENANT \
--email=barbican@example.com)
--tenant_id="$SERVICE_TENANT" \
--email="barbican@example.com")
keystone user-role-add \
--tenant_id $SERVICE_TENANT \
--user_id $BARBICAN_USER \
--role_id $ADMIN_ROLE
--tenant_id="$SERVICE_TENANT" \
--user_id="$BARBICAN_USER" \
--role_id="$ADMIN_ROLE"
#
# Setup RBAC User Projects and Roles
#
USER_PASSWORD="barbican"
PROJECT_A_ID=$(get_id keystone tenant-create \
--name="project_a")
PROJECT_B_ID=$(get_id keystone tenant-create \
--name="project_b")
ROLE_ADMIN_ID=$(get_id keystone role-get admin)
ROLE_CREATOR_ID=$(get_id keystone role-create \
--name="creator")
ROLE_OBSERVER_ID=$(get_id keystone role-create \
--name="observer")
ROLE_AUDIT_ID=$(get_id keystone role-create \
--name="audit")
#
# Setup RBAC Admin of Project A
#
USER_ID=$(get_id keystone user-create \
--name="project_a_admin" \
--pass="$USER_PASSWORD" \
--email="admin_a@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_ADMIN_ID" \
--tenant-id="$PROJECT_A_ID"
#
# Setup RBAC Creator of Project A
#
USER_ID=$(get_id keystone user-create \
--name="project_a_creator" \
--pass="$USER_PASSWORD" \
--email="creator_a@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_CREATOR_ID" \
--tenant-id="$PROJECT_A_ID"
#
# Setup RBAC Observer of Project A
#
USER_ID=$(get_id keystone user-create \
--name="project_a_observer" \
--pass="$USER_PASSWORD" \
--email="observer_a@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_OBSERVER_ID" \
--tenant-id="$PROJECT_A_ID"
#
# Setup RBAC Auditor of Project A
#
USER_ID=$(get_id keystone user-create \
--name="project_a_auditor" \
--pass="$USER_PASSWORD" \
--email="auditor_a@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_AUDIT_ID" \
--tenant-id="$PROJECT_A_ID"
#
# Setup RBAC Admin of Project B
#
USER_ID=$(get_id keystone user-create \
--name="project_b_admin" \
--pass="$USER_PASSWORD" \
--email="admin_b@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_ADMIN_ID" \
--tenant-id="$PROJECT_B_ID"
#
# Setup RBAC Observer of Project B
#
USER_ID=$(get_id keystone user-create \
--name="project_b_observer" \
--pass="$USER_PASSWORD" \
--email="observer_b@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_OBSERVER_ID" \
--tenant-id="$PROJECT_B_ID"
#
# Setup Admin Endpoint
#
if [[ "$KEYSTONE_CATALOG_BACKEND" = 'sql' ]]; then
BARBICAN_SERVICE=$(get_id keystone service-create \
--name=barbican \

View File

@ -190,7 +190,14 @@ function stop_barbican {
screen_stop barbican
}
function get_id {
echo `"$@" | awk '/ id / { print $4 }'`
}
function create_barbican_accounts {
#
# Setup Default Admin User
#
SERVICE_TENANT=$(keystone tenant-list | awk "/ $SERVICE_TENANT_NAME / { print \$2 }")
ADMIN_ROLE=$(keystone role-list | awk "/ admin / { print \$2 }")
@ -202,6 +209,90 @@ function create_barbican_accounts {
keystone user-role-add --tenant-id $SERVICE_TENANT \
--user-id $BARBICAN_USER \
--role-id $ADMIN_ROLE
#
# Setup RBAC User Projects and Roles
#
PASSWORD="barbican"
PROJECT_A_ID=$(get_id keystone tenant-create \
--name="project_a")
PROJECT_B_ID=$(get_id keystone tenant-create \
--name="project_b")
ROLE_ADMIN_ID=$(get_id keystone role-get admin)
ROLE_CREATOR_ID=$(get_id keystone role-create \
--name="creator")
ROLE_OBSERVER_ID=$(get_id keystone role-create \
--name="observer")
ROLE_AUDIT_ID=$(get_id keystone role-create \
--name="audit")
#
# Setup RBAC Admin of Project A
#
USER_ID=$(get_id keystone user-create \
--name="project_a_admin" \
--pass="$PASSWORD" \
--email="admin_a@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_ADMIN_ID" \
--tenant-id="$PROJECT_A_ID"
#
# Setup RBAC Creator of Project A
#
USER_ID=$(get_id keystone user-create \
--name="project_a_creator" \
--pass="$PASSWORD" \
--email="creator_a@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_CREATOR_ID" \
--tenant-id="$PROJECT_A_ID"
#
# Setup RBAC Observer of Project A
#
USER_ID=$(get_id keystone user-create \
--name="project_a_observer" \
--pass="$PASSWORD" \
--email="observer_a@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_OBSERVER_ID" \
--tenant-id="$PROJECT_A_ID"
#
# Setup RBAC Auditor of Project A
#
USER_ID=$(get_id keystone user-create \
--name="project_a_auditor" \
--pass="$PASSWORD" \
--email="auditor_a@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_AUDIT_ID" \
--tenant-id="$PROJECT_A_ID"
#
# Setup RBAC Admin of Project B
#
USER_ID=$(get_id keystone user-create \
--name="project_b_admin" \
--pass="$PASSWORD" \
--email="admin_b@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_ADMIN_ID" \
--tenant-id="$PROJECT_B_ID"
#
# Setup RBAC Observer of Project B
#
USER_ID=$(get_id keystone user-create \
--name="project_b_observer" \
--pass="$PASSWORD" \
--email="observer_b@example.net")
keystone user-role-add \
--user="$USER_ID" \
--role="$ROLE_OBSERVER_ID" \
--tenant-id="$PROJECT_B_ID"
#
# Setup Admin Endpoint
#
if [[ "$KEYSTONE_CATALOG_BACKEND" = 'sql' ]]; then
BARBICAN_SERVICE=$(keystone service-create \
--name=barbican \

View File

@ -10,6 +10,23 @@ project_name=admin
password=secretadmin
domain_name=Default
[rbac_users]
# Replace these values that represent additional users for RBAC testing
project_a=project_a
project_b=project_b
admin_a=project_a_admin
admin_a_password=barbican
creator_a=project_a_creator
creator_a_password=barbican
observer_a=project_a_observer
observer_a_password=barbican
auditor_a=project_a_auditor
auditor_a_password=barbican
admin_b=project_b_admin
admin_b_password=barbican
observer_b=project_b_observer
observer_b_password=barbican
[keymanager]
# use this to run the functional tests against a

View File

@ -49,3 +49,7 @@ class BaseBehaviors(object):
if href and len(href) > 0:
base, item_id = os.path.split(href)
return item_id
def get_user_id_from_name(self, user_name):
"""From a configured user name, get the unique user id from keystone"""
return self.client.get_user_id_from_name(user_name)

View File

@ -20,39 +20,47 @@ from functionaltests.api.v1.models import container_models
class ContainerBehaviors(base_behaviors.BaseBehaviors):
def create_container(self, model, extra_headers=None):
def create_container(self, model, extra_headers=None,
user_name=None, admin=None):
"""Create a container from the data in the model.
:param model: The metadata used to create the container
:param extra_headers: Headers used to create the container
:param user_name: The user name used to create the container
:param admin: The user with permissions to delete the container
:return: A tuple containing the response from the create
and the href to the newly created container
"""
resp = self.client.post('containers', request_model=model,
extra_headers=extra_headers)
extra_headers=extra_headers,
user_name=user_name)
returned_data = self.get_json(resp)
container_ref = returned_data.get('container_ref')
if container_ref:
self.created_entities.append(container_ref)
if admin is None:
admin = user_name
self.created_entities.append((container_ref, admin))
return resp, container_ref
def get_container(self, container_ref, extra_headers=None):
def get_container(self, container_ref, extra_headers=None, user_name=None):
"""Handles getting a single container
:param container_ref: Reference to the container to be retrieved
:param extra_headers: Headers used to get the container
:param user_name: The user name used to get the container
:return: The response of the GET.
"""
resp = self.client.get(
container_ref, response_model_type=container_models.ContainerModel)
container_ref, response_model_type=container_models.ContainerModel,
user_name=user_name)
return resp
def get_containers(self, limit=10, offset=0, extra_headers=None):
def get_containers(self, limit=10, offset=0, extra_headers=None,
user_name=None):
"""Handles getting a list of containers.
:param limit: limits number of returned containers
@ -60,12 +68,14 @@ class ContainerBehaviors(base_behaviors.BaseBehaviors):
the list
:param extra_headers: Extra headers used to retrieve a list of
containers
:param user_name: The user name used to get the list
:return: Returns the response, a list of container models, and
references to the next and previous list of containers.
"""
params = {'limit': limit, 'offset': offset}
resp = self.client.get('containers', params=params)
resp = self.client.get('containers', params=params,
user_name=user_name)
container_list = self.get_json(resp)
@ -75,7 +85,7 @@ class ContainerBehaviors(base_behaviors.BaseBehaviors):
return resp, containers, next_ref, prev_ref
def delete_container(self, container_ref, extra_headers=None,
expected_fail=False):
expected_fail=False, user_name=None):
"""Handles deleting a containers.
:param container_ref: Reference of the container to be deleted
@ -83,19 +93,21 @@ class ContainerBehaviors(base_behaviors.BaseBehaviors):
:param expected_fail: If there is a negative test, this should be
marked true if you are trying to delete a container that does
not exist.
:param user_name: The user name used to delete the container
:return: Response of the delete.
"""
resp = self.client.delete(container_ref, extra_headers)
resp = self.client.delete(container_ref, extra_headers,
user_name=user_name)
if not expected_fail:
self.created_entities.remove(container_ref)
for item in self.created_entities:
if item[0] == container_ref:
self.created_entities.remove(item)
return resp
def delete_all_created_containers(self):
"""Delete all of the containers that we have created."""
containers_to_delete = [container for container
in self.created_entities]
for container_ref in containers_to_delete:
self.delete_container(container_ref)
entities = list(self.created_entities)
for (container_ref, admin) in entities:
self.delete_container(container_ref, user_name=admin)

View File

@ -19,17 +19,21 @@ from functionaltests.api.v1.models import secret_models
class SecretBehaviors(base_behaviors.BaseBehaviors):
def create_secret(self, model, headers=None, use_auth=True):
def create_secret(self, model, headers=None, use_auth=True,
user_name=None, admin=None):
"""Create a secret from the data in the model.
:param model: The metadata used to create the secret
:param use_auth: Boolean for whether to send authentication headers
:param user_name: The user name used to create the secret
:param admin: The user with permissions to delete the secrets
:return: A tuple containing the response from the create
and the href to the newly created secret
"""
resp = self.client.post('secrets', request_model=model,
extra_headers=headers, use_auth=use_auth)
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
# handle expected JSON parsing errors for unauthenticated requests
if resp.status_code == 401 and not use_auth:
@ -38,12 +42,15 @@ class SecretBehaviors(base_behaviors.BaseBehaviors):
returned_data = self.get_json(resp)
secret_ref = returned_data.get('secret_ref')
if secret_ref:
self.created_entities.append(secret_ref)
if admin is None:
admin = user_name
self.created_entities.append((secret_ref, admin))
return resp, secret_ref
def update_secret_payload(self, secret_ref, payload, payload_content_type,
payload_content_encoding=None,
extra_headers=None, use_auth=True):
extra_headers=None, use_auth=True,
user_name=None):
"""Updates a secret's payload data.
:param secret_ref: HATEOS ref of the secret to be updated
@ -52,6 +59,7 @@ class SecretBehaviors(base_behaviors.BaseBehaviors):
:param payload_content_encoding: value for the Content-Encoding header
:param extra_headers: Optional HTTP headers to add to the request
:param use_auth: Boolean for whether to send authentication headers
:param user_name: The user name used to update the secret
:return: the response from the PUT update
"""
@ -69,7 +77,7 @@ class SecretBehaviors(base_behaviors.BaseBehaviors):
def get_secret(self, secret_ref, payload_content_type,
payload_content_encoding=None, extra_headers=None,
use_auth=True):
use_auth=True, user_name=None):
headers = {'Accept': payload_content_type,
'Accept-Encoding': payload_content_encoding}
@ -78,11 +86,13 @@ class SecretBehaviors(base_behaviors.BaseBehaviors):
headers.update(extra_headers)
return self.client.get(secret_ref + '/payload',
extra_headers=headers, use_auth=use_auth)
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
def get_secret_based_on_content_type(self, secret_ref,
payload_content_type,
payload_content_encoding=None):
payload_content_encoding=None,
user_name=None):
"""Retrieves a secret's payload based on the content type
NOTE: This way will be deprecated in subsequent versions of the API.
@ -91,21 +101,23 @@ class SecretBehaviors(base_behaviors.BaseBehaviors):
headers = {'Accept': payload_content_type,
'Accept-Encoding': payload_content_encoding}
return self.client.get(secret_ref, extra_headers=headers)
return self.client.get(secret_ref, extra_headers=headers,
user_name=user_name)
def get_secret_metadata(self, secret_ref, use_auth=True):
def get_secret_metadata(self, secret_ref, use_auth=True, user_name=None):
"""Retrieves a secret's metadata.
:param secret_ref: HATEOS ref of the secret to be retrieved
:param use_auth: Boolean for whether to send authentication headers
:param user_name: The user name used to get the metadata
:return: A request response object
"""
return self.client.get(
secret_ref, response_model_type=secret_models.SecretModel,
use_auth=use_auth)
use_auth=use_auth, user_name=user_name)
def get_secrets(self, limit=10, offset=0, name_filter=None,
extra_headers=None, use_auth=True):
extra_headers=None, use_auth=True, user_name=None):
"""Handles getting a list of secrets.
:param limit: limits number of returned secrets
@ -115,12 +127,14 @@ class SecretBehaviors(base_behaviors.BaseBehaviors):
those whose name matches the filter.
:param extra_headers: Optional HTTP headers to add to the request
:param use_auth: Boolean for whether to send authentication headers
:param user_name: The user name used to list the secrets
"""
params = {'limit': limit, 'offset': offset}
if name_filter:
params['name'] = name_filter
resp = self.client.get('secrets', params=params,
extra_headers=extra_headers, use_auth=use_auth)
extra_headers=extra_headers,
use_auth=use_auth, user_name=user_name)
# handle expected JSON parsing errors for unauthenticated requests
if resp.status_code == 401 and not use_auth:
@ -134,29 +148,28 @@ class SecretBehaviors(base_behaviors.BaseBehaviors):
return resp, secrets, next_ref, prev_ref
def delete_secret(self, secret_ref, extra_headers=None,
expected_fail=False, use_auth=True):
expected_fail=False, use_auth=True, user_name=None):
"""Delete a secret.
:param secret_ref: HATEOS ref of the secret to be deleted
:param extra_headers: Optional HTTP headers to add to the request
:param expected_fail: If test is expected to fail the deletion
:param use_auth: Boolean for whether to send authentication headers
:param user_name: The user name used to delete the secret
:return A request response object
"""
resp = self.client.delete(secret_ref, extra_headers=extra_headers,
use_auth=use_auth)
use_auth=use_auth, user_name=user_name)
if not expected_fail:
self.created_entities.remove(secret_ref)
for item in self.created_entities:
if item[0] == secret_ref:
self.created_entities.remove(item)
return resp
def delete_all_created_secrets(self):
"""Delete all of the secrets that we have created."""
slist = []
for entity in self.created_entities:
slist.append(entity)
for secret_ref in slist:
self.delete_secret(secret_ref)
entities = list(self.created_entities)
for (secret_ref, admin) in entities:
self.delete_secret(secret_ref, user_name=admin)

View File

@ -0,0 +1,131 @@
# Copyright (c) 2015 Cisco Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from barbican.tests import utils
from functionaltests.api import base
from functionaltests.api.v1.behaviors import container_behaviors
from functionaltests.api.v1.behaviors import secret_behaviors
from functionaltests.api.v1.models import container_models
from functionaltests.api.v1.models import secret_models
from functionaltests.common import config
CONF = config.get_config()
admin_a = CONF.rbac_users.admin_a
creator_a = CONF.rbac_users.creator_a
observer_a = CONF.rbac_users.observer_a
auditor_a = CONF.rbac_users.auditor_a
admin_b = CONF.rbac_users.admin_b
observer_b = CONF.rbac_users.observer_b
test_data_rbac_read_secret = {
'with_admin_a': {'user': admin_a, 'expected_return': 200},
'with_creator_a': {'user': creator_a, 'expected_return': 200},
'with_observer_a': {'user': observer_a, 'expected_return': 200},
'with_auditor_a': {'user': auditor_a, 'expected_return': 403},
'with_admin_b': {'user': admin_b, 'expected_return': 403},
'with_observer_b': {'user': observer_b, 'expected_return': 403},
}
test_data_rbac_read_container = {
'with_admin_a': {'user': admin_a, 'expected_return': 200},
'with_creator_a': {'user': creator_a, 'expected_return': 200},
'with_observer_a': {'user': observer_a, 'expected_return': 200},
'with_auditor_a': {'user': auditor_a, 'expected_return': 200},
'with_admin_b': {'user': admin_b, 'expected_return': 403},
'with_observer_b': {'user': observer_b, 'expected_return': 403},
}
@utils.parameterized_test_case
class RbacTestCase(base.TestCase):
"""Functional tests exercising RBAC Policies"""
def setUp(self):
super(RbacTestCase, self).setUp()
self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client)
self.container_behaviors = container_behaviors.ContainerBehaviors(
self.client)
def tearDown(self):
self.secret_behaviors.delete_all_created_secrets()
self.container_behaviors.delete_all_created_containers()
super(RbacTestCase, self).tearDown()
@utils.parameterized_dataset(test_data_rbac_read_secret)
def test_rbac_read_secret(self, user, expected_return):
secret_ref = self.store_secret()
status = self.get_secret(secret_ref, user_name=user)
self.assertEqual(expected_return, status)
@utils.parameterized_dataset(test_data_rbac_read_container)
def test_rbac_read_container(self, user, expected_return):
container_ref = self.store_container()
status = self.get_container(container_ref, user_name=user)
self.assertEqual(expected_return, status)
# ----------------------- Helper Functions ---------------------------
def store_secret(self, user_name=creator_a, admin=admin_a):
test_model = secret_models.SecretModel(
**get_default_secret_data())
resp, secret_ref = self.secret_behaviors.create_secret(
test_model, user_name=user_name, admin=admin)
self.assertEqual(201, resp.status_code)
return secret_ref
def get_secret(self, secret_ref, user_name=creator_a):
resp = self.secret_behaviors.get_secret(
secret_ref, 'application/octet-stream',
user_name=user_name)
return resp.status_code
def store_container(self, user_name=creator_a, admin=admin_a):
secret_ref = self.store_secret(user_name=user_name, admin=admin)
test_model = container_models.ContainerModel(
**get_container_req(secret_ref))
resp, container_ref = self.container_behaviors.create_container(
test_model, user_name=user_name, admin=admin)
self.assertEqual(201, resp.status_code)
return container_ref
def get_container(self, container_ref, user_name=creator_a):
resp = self.container_behaviors.get_container(
container_ref, user_name=user_name)
return resp.status_code
# ----------------------- Support Functions ---------------------------
def get_default_secret_data():
return {
"name": "AES key",
"expiration": "2018-02-28T19:14:44.180394",
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
"payload": get_default_payload(),
"payload_content_type": "application/octet-stream",
"payload_content_encoding": "base64",
}
def get_default_payload():
return 'Z0Y2K2xMb0Yzb2hBOWFQUnB0KzZiUT09'
def get_container_req(secret_ref):
return {"name": "testcontainer",
"type": "generic",
"secret_refs": [{'name': 'secret1', 'secret_ref': secret_ref}]}

View File

@ -1028,7 +1028,8 @@ class SecretsUnauthedTestCase(base.TestCase):
secret_models.SecretModel(**self.default_secret_create_data)
)
stored_auth = self.client._auth.stored_auth
stored_auth = self.client._auth[
self.client._default_user_name].stored_auth
project_id = stored_auth.values()[0]['project_id']
self.project_id_header = {
'X-Project-Id': project_id

View File

@ -79,11 +79,16 @@ class FunctionalTestAuth(auth.AuthBase):
self.stored_auth[self.username] = {
'token': token,
'project_id': project_id,
'service_catalog': self._client.service_catalog
'service_catalog': self._client.service_catalog,
'user_id': self._client.auth_user_id
}
return self.stored_auth[self.username]
def get_user_id(self):
"""Return the UID used by keystone to uniquely identify the user"""
return self.authenticate()['user_id']
def __call__(self, r):
creds = self.authenticate()

View File

@ -31,13 +31,6 @@ CONF = config.get_config()
class BarbicanClient(object):
def __init__(self, api_version='v1'):
self._auth = auth.FunctionalTestAuth(
endpoint=CONF.identity.uri,
version=CONF.identity.version,
username=CONF.identity.username,
password=CONF.identity.password,
project_name=CONF.identity.project_name
)
self.timeout = 10
self.api_version = api_version
self.default_headers = {
@ -45,6 +38,50 @@ class BarbicanClient(object):
'Accept': 'application/json'
}
self.region = CONF.identity.region
self._default_user_name = CONF.identity.username
self._auth = {}
self._auth[CONF.identity.username] = auth.FunctionalTestAuth(
endpoint=CONF.identity.uri,
version=CONF.identity.version,
username=CONF.identity.username,
password=CONF.identity.password,
project_name=CONF.identity.project_name)
self._auth[CONF.rbac_users.admin_a] = auth.FunctionalTestAuth(
endpoint=CONF.identity.uri,
version=CONF.identity.version,
username=CONF.rbac_users.admin_a,
password=CONF.rbac_users.admin_a_password,
project_name=CONF.rbac_users.project_a)
self._auth[CONF.rbac_users.creator_a] = auth.FunctionalTestAuth(
endpoint=CONF.identity.uri,
version=CONF.identity.version,
username=CONF.rbac_users.creator_a,
password=CONF.rbac_users.creator_a_password,
project_name=CONF.rbac_users.project_a)
self._auth[CONF.rbac_users.observer_a] = auth.FunctionalTestAuth(
endpoint=CONF.identity.uri,
version=CONF.identity.version,
username=CONF.rbac_users.observer_a,
password=CONF.rbac_users.observer_a_password,
project_name=CONF.rbac_users.project_a)
self._auth[CONF.rbac_users.auditor_a] = auth.FunctionalTestAuth(
endpoint=CONF.identity.uri,
version=CONF.identity.version,
username=CONF.rbac_users.auditor_a,
password=CONF.rbac_users.auditor_a_password,
project_name=CONF.rbac_users.project_a)
self._auth[CONF.rbac_users.admin_b] = auth.FunctionalTestAuth(
endpoint=CONF.identity.uri,
version=CONF.identity.version,
username=CONF.rbac_users.admin_b,
password=CONF.rbac_users.admin_b_password,
project_name=CONF.rbac_users.project_b)
self._auth[CONF.rbac_users.observer_b] = auth.FunctionalTestAuth(
endpoint=CONF.identity.uri,
version=CONF.identity.version,
username=CONF.rbac_users.observer_b,
password=CONF.rbac_users.observer_b_password,
project_name=CONF.rbac_users.project_b)
def _attempt_to_stringify_content(self, content, content_tag):
if content is None:
@ -79,10 +116,17 @@ class BarbicanClient(object):
'Request Body: {body}\n'
'Response: {response_body}').format(**format_kwargs)
def log_request(self, request_kwargs, response):
def log_request(self, request_kwargs, response, user_name):
test_name = misc_utils.find_test_caller()
str_request = self.stringify_request(request_kwargs, response)
LOG.info('Request (%s)\n %s', test_name, str_request)
if user_name is None:
user_info = ''
else:
user_info = "(user={0})".format(user_name)
LOG.info('Request %s (%s)\n %s',
user_info,
test_name,
str_request)
def _status_is_2xx_success(self, status_code):
return status_code >= 200 and status_code < 300
@ -128,14 +172,14 @@ class BarbicanClient(object):
if CONF.keymanager.override_url:
return self._get_base_url_from_config(include_version)
endpoint = self._auth.service_catalog.get_endpoints(
auth = self._auth[self._default_user_name]
endpoint = auth.service_catalog.get_endpoints(
service_type='key-manager',
service_name='barbican',
region_name='RegionOne',
endpoint_type='public'
)
endpoint_type='public')
if self._auth.version.lower() == 'v2':
if auth.version.lower() == 'v2':
base_url = endpoint['key-manager'][0].get('publicURL')
else:
base_url = endpoint['key-manager'][0].get('url')
@ -173,7 +217,7 @@ class BarbicanClient(object):
def request(self, method, url, data=None, extra_headers=None,
use_auth=True, response_model_type=None, request_model=None,
params=None):
params=None, user_name=None):
"""Prepares and sends http request through Requests."""
if url and 'http' not in url:
url = urllib.parse.urljoin(self.get_base_url(), url)
@ -198,14 +242,16 @@ class BarbicanClient(object):
'params': params
}
if use_auth:
call_kwargs['auth'] = self._auth
if not user_name:
user_name = self._default_user_name
call_kwargs['auth'] = self._auth[user_name]
response = requests.request(**call_kwargs)
# Attempt to deserialize the response
response.model = self.attempt_to_deserialize(response,
response_model_type)
self.log_request(call_kwargs, response)
self.log_request(call_kwargs, response, user_name)
return response
def get(self, *args, **kwargs):
@ -223,3 +269,9 @@ class BarbicanClient(object):
def delete(self, *args, **kwargs):
"""Proxies the request method specifically for http DELETE methods."""
return self.request('DELETE', *args, **kwargs)
def get_user_id_from_name(self, user_name):
if user_name and self._auth[user_name]:
return self._auth[user_name].get_user_id()
else:
return None

View File

@ -32,11 +32,29 @@ def setup_config(config_file=''):
cfg.StrOpt('password', default='secretadmin'),
cfg.StrOpt('project_name', default='admin'),
cfg.StrOpt('domain_name', default='Default'),
cfg.StrOpt('region', default='RegionOne')
]
cfg.StrOpt('region', default='RegionOne')]
TEST_CONF.register_group(identity_group)
TEST_CONF.register_opts(identity_options, group=identity_group)
rbac_users_group = cfg.OptGroup(name='rbac_users')
rbac_users_options = [
cfg.StrOpt('project_a', default='project_a'),
cfg.StrOpt('project_b', default='project_b'),
cfg.StrOpt('admin_a', default='project_a_admin'),
cfg.StrOpt('admin_a_password', default='barbican'),
cfg.StrOpt('creator_a', default='project_a_creator'),
cfg.StrOpt('creator_a_password', default='barbican'),
cfg.StrOpt('observer_a', default='project_a_observer'),
cfg.StrOpt('observer_a_password', default='barbican'),
cfg.StrOpt('auditor_a', default='project_a_auditor'),
cfg.StrOpt('auditor_a_password', default='barbican'),
cfg.StrOpt('admin_b', default='project_b_admin'),
cfg.StrOpt('admin_b_password', default='barbican'),
cfg.StrOpt('observer_b', default='project_b_observer'),
cfg.StrOpt('observer_b_password', default='barbican')]
TEST_CONF.register_group(rbac_users_group)
TEST_CONF.register_opts(rbac_users_options, group=rbac_users_group)
keymanager_group = cfg.OptGroup(name='keymanager')
keymanager_options = [
cfg.StrOpt('override_url', default=''),