Moving clients to common module
* Moves tests clients to adjutant/common/tests.py * QuotaManager to common/quota.py * openstack_clients and user_store moved to common Change-Id: I143d69dee6b987e9b719a2a07f23a37da576bc27
This commit is contained in:
parent
18c3c2e5e0
commit
3973fcb446
|
@ -18,7 +18,7 @@ from django.conf import settings
|
|||
from django.utils import timezone
|
||||
|
||||
from adjutant.common.quota import QuotaManager
|
||||
from adjutant.actions import user_store
|
||||
from adjutant.common import user_store
|
||||
from adjutant.actions.models import Action
|
||||
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ import six
|
|||
from django.conf import settings
|
||||
|
||||
from adjutant.actions.v1.base import BaseAction
|
||||
from adjutant.actions import user_store
|
||||
from adjutant.common import user_store
|
||||
from adjutant.actions.utils import send_email
|
||||
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ from uuid import uuid4
|
|||
|
||||
from django.utils import timezone
|
||||
|
||||
from adjutant.actions import user_store
|
||||
from adjutant.common import user_store
|
||||
from adjutant.actions.v1.base import (
|
||||
BaseAction, UserNameAction, UserMixin, ProjectMixin)
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
from adjutant.actions.v1.base import BaseAction, ProjectMixin, QuotaMixin
|
||||
from adjutant.actions import openstack_clients, user_store
|
||||
from adjutant.common import openstack_clients, user_store
|
||||
from adjutant.api import models
|
||||
from adjutant.common.quota import QuotaManager
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
|
||||
from rest_framework import serializers
|
||||
from django.conf import settings
|
||||
from adjutant.actions import user_store
|
||||
from adjutant.common import user_store
|
||||
|
||||
|
||||
role_options = settings.DEFAULT_ACTION_SETTINGS.get("NewUserAction", {}).get(
|
||||
|
|
|
@ -1,316 +0,0 @@
|
|||
# Copyright (C) 2015 Catalyst IT Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
neutron_cache = {}
|
||||
nova_cache = {}
|
||||
cinder_cache = {}
|
||||
|
||||
|
||||
class FakeOpenstackClient(object):
|
||||
class Quotas(object):
|
||||
""" Stub class for testing quotas """
|
||||
def __init__(self, service):
|
||||
self.service = service
|
||||
|
||||
def update(self, project_id, **kwargs):
|
||||
self.service.update_quota(project_id, **kwargs)
|
||||
|
||||
def get(self, project_id):
|
||||
return self.QuotaSet(
|
||||
self.service._cache[self.service.region][project_id]['quota'])
|
||||
|
||||
class QuotaSet(object):
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
|
||||
def to_dict(self):
|
||||
return self.data
|
||||
|
||||
def __init__(self, region, cache):
|
||||
self.region = region
|
||||
self._cache = cache
|
||||
self.quotas = FakeOpenstackClient.Quotas(self)
|
||||
|
||||
def update_quota(self, project_id, **kwargs):
|
||||
if self.region not in self._cache:
|
||||
self._cache[self.region] = {}
|
||||
if project_id not in self._cache[self.region]:
|
||||
self._cache[self.region][project_id] = {
|
||||
'quota': {}
|
||||
}
|
||||
quota = self._cache[self.region][project_id]['quota']
|
||||
quota.update(kwargs)
|
||||
|
||||
|
||||
class FakeNeutronClient(object):
|
||||
|
||||
def __init__(self, region):
|
||||
self.region = region
|
||||
|
||||
def create_network(self, body):
|
||||
global neutron_cache
|
||||
project_id = body['network']['tenant_id']
|
||||
net = {'network': {'id': 'net_id_%s' % neutron_cache['RegionOne']['i'],
|
||||
'body': body}}
|
||||
net_id = net['network']['id']
|
||||
neutron_cache['RegionOne'][project_id]['networks'][net_id] = net
|
||||
neutron_cache['RegionOne']['i'] += 1
|
||||
return net
|
||||
|
||||
def create_subnet(self, body):
|
||||
global neutron_cache
|
||||
project_id = body['subnet']['tenant_id']
|
||||
subnet = {'subnet': {'id': 'subnet_id_%s'
|
||||
% neutron_cache['RegionOne']['i'],
|
||||
'body': body}}
|
||||
sub_id = subnet['subnet']['id']
|
||||
neutron_cache['RegionOne'][project_id]['subnets'][sub_id] = subnet
|
||||
neutron_cache['RegionOne']['i'] += 1
|
||||
return subnet
|
||||
|
||||
def create_router(self, body):
|
||||
global neutron_cache
|
||||
project_id = body['router']['tenant_id']
|
||||
router = {'router': {'id': 'router_id_%s'
|
||||
% neutron_cache['RegionOne']['i'],
|
||||
'body': body}}
|
||||
router_id = router['router']['id']
|
||||
neutron_cache['RegionOne'][project_id]['routers'][router_id] = router
|
||||
neutron_cache['RegionOne']['i'] += 1
|
||||
return router
|
||||
|
||||
def add_interface_router(self, router_id, body):
|
||||
global neutron_cache
|
||||
port_id = "port_id_%s" % neutron_cache['RegionOne']['i']
|
||||
neutron_cache['RegionOne']['i'] += 1
|
||||
interface = {
|
||||
'port_id': port_id,
|
||||
'id': router_id,
|
||||
'subnet_id': body['subnet_id']}
|
||||
return interface
|
||||
|
||||
def update_quota(self, project_id, body):
|
||||
global neutron_cache
|
||||
if self.region not in neutron_cache:
|
||||
neutron_cache[self.region] = {}
|
||||
if project_id not in neutron_cache[self.region]:
|
||||
neutron_cache[self.region][project_id] = {}
|
||||
|
||||
if 'quota' not in neutron_cache[self.region][project_id]:
|
||||
neutron_cache[self.region][project_id]['quota'] = {}
|
||||
|
||||
quota = neutron_cache[self.region][project_id]['quota']
|
||||
quota.update(body['quota'])
|
||||
|
||||
def show_quota(self, project_id):
|
||||
return {"quota": neutron_cache[self.region][project_id]['quota']}
|
||||
|
||||
def list_networks(self, tenant_id):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_routers(self, tenant_id):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_subnets(self, tenant_id=0):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_security_groups(self, tenant_id=0):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_floatingips(self, tenant_id=0):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_security_group_rules(self, tenant_id=0):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_ports(self, tenant_id=0):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
|
||||
class FakeNovaClient(FakeOpenstackClient):
|
||||
|
||||
def __init__(self, region):
|
||||
global nova_cache
|
||||
super(FakeNovaClient, self).__init__(region, nova_cache)
|
||||
self.limits = self.LimitFakers(nova_cache[region])
|
||||
|
||||
class LimitFakers(object):
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
|
||||
def get(self, tenant_id):
|
||||
return self.LimitFake(self.data, tenant_id)
|
||||
|
||||
class LimitFake(object):
|
||||
def __init__(self, data, project_id):
|
||||
self.project_id = project_id
|
||||
self.data = data
|
||||
|
||||
def to_dict(self):
|
||||
return self.data[self.project_id]
|
||||
|
||||
|
||||
class FakeCinderClient(FakeOpenstackClient):
|
||||
class FakeResourceGroup(object):
|
||||
""" Stub class to represent volumes and snapshots """
|
||||
|
||||
def __init__(self, region, cache_key):
|
||||
self.region = region
|
||||
self.key = cache_key
|
||||
|
||||
def list(self, search_opts=None):
|
||||
if search_opts:
|
||||
project_id = search_opts['project_id']
|
||||
# TODO: This should return data from the cache so that it
|
||||
# can be set up properly
|
||||
global cinder_cache
|
||||
return cinder_cache[self.region][project_id][self.key]
|
||||
|
||||
def __init__(self, region):
|
||||
global cinder_cache
|
||||
self.region = region
|
||||
self._cache = cinder_cache
|
||||
self.quotas = FakeOpenstackClient.Quotas(self)
|
||||
self.volumes = self.FakeResourceGroup(region, 'volumes')
|
||||
self.volume_snapshots = self.FakeResourceGroup(region,
|
||||
'volume_snapshots')
|
||||
|
||||
|
||||
class FakeResource(object):
|
||||
""" Stub class to represent an individual instance of a volume or
|
||||
snapshot """
|
||||
|
||||
def __init__(self, size):
|
||||
self.size = size
|
||||
|
||||
|
||||
def setup_neutron_cache(region, project_id):
|
||||
global neutron_cache
|
||||
if region not in neutron_cache:
|
||||
neutron_cache[region] = {'i': 0}
|
||||
else:
|
||||
neutron_cache[region]['i'] = 0
|
||||
if project_id not in neutron_cache[region]:
|
||||
neutron_cache[region][project_id] = {}
|
||||
|
||||
neutron_cache[region][project_id] = {
|
||||
'networks': {},
|
||||
'subnets': {},
|
||||
'routers': {},
|
||||
'security_groups': {},
|
||||
'floatingips': {},
|
||||
'security_group_rules': {},
|
||||
'ports': {},
|
||||
}
|
||||
|
||||
neutron_cache[region][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES['small']['neutron'])
|
||||
|
||||
|
||||
def setup_cinder_cache(region, project_id):
|
||||
global cinder_cache
|
||||
if region not in cinder_cache:
|
||||
cinder_cache[region] = {}
|
||||
if project_id not in cinder_cache[region]:
|
||||
cinder_cache[region][project_id] = {}
|
||||
|
||||
cinder_cache[region][project_id] = {
|
||||
'volumes': [],
|
||||
'volume_snapshots': [],
|
||||
}
|
||||
|
||||
cinder_cache[region][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES['small']['cinder'])
|
||||
|
||||
|
||||
def setup_nova_cache(region, project_id):
|
||||
global nova_cache
|
||||
if region not in nova_cache:
|
||||
nova_cache[region] = {}
|
||||
if project_id not in nova_cache[region]:
|
||||
nova_cache[region][project_id] = {}
|
||||
|
||||
# Mocking the nova limits api
|
||||
nova_cache[region][project_id] = {
|
||||
'absolute': {
|
||||
"totalInstancesUsed": 0,
|
||||
"totalFloatingIpsUsed": 0,
|
||||
"totalRAMUsed": 0,
|
||||
"totalCoresUsed": 0,
|
||||
"totalSecurityGroupsUsed": 0
|
||||
}
|
||||
}
|
||||
nova_cache[region][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES['small']['nova'])
|
||||
|
||||
|
||||
def setup_quota_cache(region_name, project_id, size='small'):
|
||||
""" Sets up the quota cache for a given region and project """
|
||||
global cinder_cache
|
||||
|
||||
if region_name not in cinder_cache:
|
||||
cinder_cache[region_name] = {}
|
||||
|
||||
if project_id not in cinder_cache[region_name]:
|
||||
cinder_cache[region_name][project_id] = {
|
||||
'quota': {}
|
||||
}
|
||||
|
||||
cinder_cache[region_name][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES[size]['cinder'])
|
||||
|
||||
global nova_cache
|
||||
if region_name not in nova_cache:
|
||||
nova_cache[region_name] = {}
|
||||
|
||||
if project_id not in nova_cache[region_name]:
|
||||
nova_cache[region_name][project_id] = {
|
||||
'quota': {}
|
||||
}
|
||||
|
||||
nova_cache[region_name][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES[size]['nova'])
|
||||
|
||||
global neutron_cache
|
||||
if region_name not in neutron_cache:
|
||||
neutron_cache[region_name] = {}
|
||||
|
||||
if project_id not in neutron_cache[region_name]:
|
||||
neutron_cache[region_name][project_id] = {
|
||||
'quota': {}
|
||||
}
|
||||
|
||||
neutron_cache[region_name][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES[size]['neutron'])
|
||||
|
||||
|
||||
def setup_mock_caches(region, project_id):
|
||||
setup_nova_cache(region, project_id)
|
||||
setup_cinder_cache(region, project_id)
|
||||
setup_neutron_cache(region, project_id)
|
||||
|
||||
|
||||
def get_fake_neutron(region):
|
||||
return FakeNeutronClient(region)
|
||||
|
||||
|
||||
def get_fake_novaclient(region):
|
||||
return FakeNovaClient(region)
|
||||
|
||||
|
||||
def get_fake_cinderclient(region):
|
||||
global cinder_cache
|
||||
return FakeCinderClient(region)
|
|
@ -19,8 +19,8 @@ from django.core import mail
|
|||
from adjutant.actions.v1.misc import SendAdditionalEmailAction
|
||||
from adjutant.actions.utils import send_email
|
||||
from adjutant.api.models import Task
|
||||
from adjutant.api.v1.tests import (FakeManager,
|
||||
modify_dict_settings, AdjutantTestCase)
|
||||
from adjutant.common.tests.fake_clients import FakeManager
|
||||
from adjutant.common.tests.utils import modify_dict_settings, AdjutantTestCase
|
||||
from smtplib import SMTPException
|
||||
|
||||
default_email_conf = {
|
||||
|
@ -37,7 +37,7 @@ class FailEmail(mock.MagicMock):
|
|||
raise SMTPException
|
||||
|
||||
|
||||
@mock.patch('adjutant.actions.user_store.IdentityManager',
|
||||
@mock.patch('adjutant.common.user_store.IdentityManager',
|
||||
FakeManager)
|
||||
class MiscActionTests(AdjutantTestCase):
|
||||
|
||||
|
|
|
@ -21,12 +21,12 @@ from adjutant.actions.v1.projects import (
|
|||
NewProjectWithUserAction, AddDefaultUsersToProjectAction,
|
||||
NewProjectAction)
|
||||
from adjutant.api.models import Task
|
||||
from adjutant.api.v1 import tests
|
||||
from adjutant.api.v1.tests import (FakeManager, setup_temp_cache,
|
||||
modify_dict_settings)
|
||||
from adjutant.common.tests import fake_clients
|
||||
from adjutant.common.tests.fake_clients import FakeManager, setup_temp_cache
|
||||
from adjutant.common.tests.utils import modify_dict_settings
|
||||
|
||||
|
||||
@mock.patch('adjutant.actions.user_store.IdentityManager',
|
||||
@mock.patch('adjutant.common.user_store.IdentityManager',
|
||||
FakeManager)
|
||||
class ProjectActionTests(TestCase):
|
||||
|
||||
|
@ -60,7 +60,7 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
self.assertEquals(
|
||||
task.cache,
|
||||
|
@ -74,12 +74,12 @@ class ProjectActionTests(TestCase):
|
|||
action.submit(token_data)
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].name,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].name,
|
||||
'test@example.com')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].email,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].email,
|
||||
'test@example.com')
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(
|
||||
sorted(project.roles["user_id_1"]),
|
||||
sorted(['_member_', 'project_admin',
|
||||
|
@ -113,7 +113,7 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
self.assertEquals(
|
||||
task.cache,
|
||||
|
@ -123,7 +123,7 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
self.assertEquals(
|
||||
task.cache,
|
||||
|
@ -135,9 +135,9 @@ class ProjectActionTests(TestCase):
|
|||
self.assertEquals(action.valid, True)
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].email,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].email,
|
||||
'test@example.com')
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(
|
||||
sorted(project.roles["user_id_1"]),
|
||||
sorted(['_member_', 'project_admin',
|
||||
|
@ -188,9 +188,9 @@ class ProjectActionTests(TestCase):
|
|||
self.assertTrue("user_id" in action.action.cache)
|
||||
self.assertFalse("roles_granted" in action.action.cache)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].email,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].email,
|
||||
'test@example.com')
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertFalse("user_id_1" in project.roles)
|
||||
|
||||
# And then swap back the correct function
|
||||
|
@ -205,7 +205,7 @@ class ProjectActionTests(TestCase):
|
|||
action.submit(token_data)
|
||||
self.assertEquals(action.valid, True)
|
||||
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(
|
||||
sorted(project.roles["user_id_1"]),
|
||||
sorted(['_member_', 'project_admin',
|
||||
|
@ -244,7 +244,7 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
self.assertEquals(
|
||||
task.cache,
|
||||
|
@ -256,9 +256,9 @@ class ProjectActionTests(TestCase):
|
|||
self.assertEquals(action.valid, True)
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users'][user.id].email,
|
||||
fake_clients.identity_temp_cache['users'][user.id].email,
|
||||
'test@example.com')
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(
|
||||
sorted(project.roles[user.id]),
|
||||
sorted(['_member_', 'project_admin',
|
||||
|
@ -301,7 +301,8 @@ class ProjectActionTests(TestCase):
|
|||
self.assertEquals(action.valid, False)
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects'].get('test_project'), None)
|
||||
fake_clients.identity_temp_cache['projects'].get('test_project'),
|
||||
None)
|
||||
|
||||
token_data = {'password': '123456'}
|
||||
action.submit(token_data)
|
||||
|
@ -334,14 +335,14 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
self.assertEquals(
|
||||
task.cache,
|
||||
{'project_id': 'project_id_1', 'user_id': 'user_id_1',
|
||||
'user_state': 'default'})
|
||||
|
||||
tests.temp_cache['projects'] = {}
|
||||
fake_clients.identity_temp_cache['projects'] = {}
|
||||
|
||||
token_data = {'password': '123456'}
|
||||
action.submit(token_data)
|
||||
|
@ -374,14 +375,14 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
self.assertEquals(
|
||||
task.cache,
|
||||
{'project_id': 'project_id_1', 'user_id': 'user_id_1',
|
||||
'user_state': 'default'})
|
||||
|
||||
tests.temp_cache['users'] = {}
|
||||
fake_clients.identity_temp_cache['users'] = {}
|
||||
|
||||
token_data = {'password': '123456'}
|
||||
action.submit(token_data)
|
||||
|
@ -423,7 +424,7 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
self.assertEquals(
|
||||
task.cache,
|
||||
|
@ -441,14 +442,14 @@ class ProjectActionTests(TestCase):
|
|||
|
||||
# check that user has been created correctly
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users'][user.id].email,
|
||||
fake_clients.identity_temp_cache['users'][user.id].email,
|
||||
'test@example.com')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users'][user.id].enabled,
|
||||
fake_clients.identity_temp_cache['users'][user.id].enabled,
|
||||
True)
|
||||
|
||||
# Check user has correct roles in new project
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(
|
||||
sorted(project.roles[user.id]),
|
||||
sorted(['_member_', 'project_admin',
|
||||
|
@ -498,7 +499,7 @@ class ProjectActionTests(TestCase):
|
|||
# approve previous signup
|
||||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(
|
||||
project.name,
|
||||
'test_project')
|
||||
|
@ -616,7 +617,7 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
self.assertEquals(
|
||||
task.cache,
|
||||
|
@ -627,12 +628,12 @@ class ProjectActionTests(TestCase):
|
|||
action.submit(token_data)
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].email,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].email,
|
||||
'test@example.com')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].name,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].name,
|
||||
'test_user')
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(
|
||||
sorted(project.roles["user_id_1"]),
|
||||
sorted(['_member_', 'project_admin',
|
||||
|
@ -674,7 +675,7 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(project.roles['user_id_0'], ['admin'])
|
||||
|
||||
def test_add_default_users_invalid_project(self):
|
||||
|
@ -737,13 +738,13 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(project.roles['user_id_0'], ['admin'])
|
||||
|
||||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(project.roles['user_id_0'], ['admin'])
|
||||
|
||||
def test_new_project_action(self):
|
||||
|
@ -782,14 +783,14 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].parent,
|
||||
'parent_project')
|
||||
fake_clients.identity_temp_cache['projects'][
|
||||
'test_project'].parent, 'parent_project')
|
||||
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(
|
||||
sorted(project.roles["test_user_id"]),
|
||||
sorted(['_member_', 'project_admin',
|
||||
|
@ -834,25 +835,25 @@ class ProjectActionTests(TestCase):
|
|||
action.post_approve()
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].parent,
|
||||
'parent_project')
|
||||
fake_clients.identity_temp_cache[
|
||||
'projects']['test_project'].parent, 'parent_project')
|
||||
|
||||
action.post_approve()
|
||||
# Nothing should change
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].name,
|
||||
fake_clients.identity_temp_cache['projects']['test_project'].name,
|
||||
'test_project')
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].parent,
|
||||
'parent_project')
|
||||
fake_clients.identity_temp_cache[
|
||||
'projects']['test_project'].parent, 'parent_project')
|
||||
|
||||
project = tests.temp_cache['projects']['test_project']
|
||||
project = fake_clients.identity_temp_cache['projects']['test_project']
|
||||
self.assertEquals(
|
||||
sorted(project.roles["test_user_id"]),
|
||||
sorted(['_member_', 'project_admin',
|
||||
|
@ -978,8 +979,8 @@ class ProjectActionTests(TestCase):
|
|||
self.assertEquals(action.valid, True)
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['projects']['test_project'].parent,
|
||||
'default')
|
||||
fake_clients.identity_temp_cache[
|
||||
'projects']['test_project'].parent, 'default')
|
||||
|
||||
action.submit({})
|
||||
self.assertEquals(action.valid, True)
|
||||
|
|
|
@ -21,15 +21,14 @@ from adjutant.actions.v1.resources import (
|
|||
NewDefaultNetworkAction, NewProjectDefaultNetworkAction,
|
||||
SetProjectQuotaAction, UpdateProjectQuotasAction)
|
||||
from adjutant.api.models import Task
|
||||
from adjutant.api.v1.tests import (FakeManager, setup_temp_cache,
|
||||
modify_dict_settings)
|
||||
from adjutant.actions.v1.tests import (
|
||||
get_fake_neutron, get_fake_novaclient, get_fake_cinderclient,
|
||||
setup_neutron_cache, neutron_cache, cinder_cache, nova_cache,
|
||||
setup_mock_caches)
|
||||
from adjutant.common.tests.utils import modify_dict_settings
|
||||
from adjutant.common.tests.fake_clients import (
|
||||
FakeManager, setup_temp_cache, get_fake_neutron, get_fake_novaclient,
|
||||
get_fake_cinderclient, setup_neutron_cache, neutron_cache, cinder_cache,
|
||||
nova_cache, setup_mock_caches)
|
||||
|
||||
|
||||
@mock.patch('adjutant.actions.user_store.IdentityManager',
|
||||
@mock.patch('adjutant.common.user_store.IdentityManager',
|
||||
FakeManager)
|
||||
@mock.patch(
|
||||
'adjutant.actions.v1.resources.' +
|
||||
|
@ -465,7 +464,7 @@ class ProjectSetupActionTests(TestCase):
|
|||
|
||||
|
||||
@mock.patch(
|
||||
'adjutant.actions.user_store.IdentityManager',
|
||||
'adjutant.common.user_store.IdentityManager',
|
||||
FakeManager)
|
||||
@mock.patch(
|
||||
'adjutant.common.quota.get_neutronclient',
|
||||
|
|
|
@ -20,13 +20,13 @@ from adjutant.actions.v1.users import (
|
|||
EditUserRolesAction, NewUserAction, ResetUserPasswordAction,
|
||||
UpdateUserEmailAction)
|
||||
from adjutant.api.models import Task
|
||||
from adjutant.api.v1 import tests
|
||||
from adjutant.api.v1.tests import (FakeManager, setup_temp_cache,
|
||||
modify_dict_settings, AdjutantTestCase)
|
||||
from adjutant.common.tests import fake_clients
|
||||
from adjutant.common.tests.fake_clients import setup_temp_cache
|
||||
from adjutant.common.tests.utils import modify_dict_settings, AdjutantTestCase
|
||||
|
||||
|
||||
@mock.patch('adjutant.actions.user_store.IdentityManager',
|
||||
FakeManager)
|
||||
@mock.patch('adjutant.common.user_store.IdentityManager',
|
||||
fake_clients.FakeManager)
|
||||
class UserActionTests(AdjutantTestCase):
|
||||
|
||||
def test_new_user(self):
|
||||
|
@ -68,13 +68,13 @@ class UserActionTests(AdjutantTestCase):
|
|||
token_data = {'password': '123456'}
|
||||
action.submit(token_data)
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(len(tests.temp_cache['users']), 2)
|
||||
self.assertEquals(len(fake_clients.identity_temp_cache['users']), 2)
|
||||
# The new user id in this case will be "user_id_1"
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].email,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].email,
|
||||
'test@example.com')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].password,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].password,
|
||||
'123456')
|
||||
|
||||
self.assertEquals(project.roles["user_id_1"], ['_member_'])
|
||||
|
@ -171,16 +171,16 @@ class UserActionTests(AdjutantTestCase):
|
|||
token_data = {'password': '123456'}
|
||||
action.submit(token_data)
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(len(tests.temp_cache['users']), 2)
|
||||
self.assertEquals(len(fake_clients.identity_temp_cache['users']), 2)
|
||||
# The new user id in this case will be "user_id_1"
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].email,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].email,
|
||||
'test@example.com')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].password,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].password,
|
||||
'123456')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].enabled,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].enabled,
|
||||
True)
|
||||
|
||||
self.assertEquals(project.roles["user_id_1"], ['_member_'])
|
||||
|
@ -435,7 +435,7 @@ class UserActionTests(AdjutantTestCase):
|
|||
self.assertEquals(action.valid, True)
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users'][user.id].password,
|
||||
fake_clients.identity_temp_cache['users'][user.id].password,
|
||||
'123456')
|
||||
|
||||
def test_reset_user_password_no_user(self):
|
||||
|
@ -800,7 +800,7 @@ class UserActionTests(AdjutantTestCase):
|
|||
|
||||
setup_temp_cache({'test_project': project}, {user.id: user})
|
||||
|
||||
tests.temp_cache['roles']['new_role'] = 'new_role'
|
||||
fake_clients.identity_temp_cache['roles']['new_role'] = 'new_role'
|
||||
|
||||
task = Task.objects.create(
|
||||
ip_address="0.0.0.0",
|
||||
|
@ -877,16 +877,16 @@ class UserActionTests(AdjutantTestCase):
|
|||
token_data = {'password': '123456'}
|
||||
action.submit(token_data)
|
||||
self.assertEquals(action.valid, True)
|
||||
self.assertEquals(len(tests.temp_cache['users']), 2)
|
||||
self.assertEquals(len(fake_clients.identity_temp_cache['users']), 2)
|
||||
# The new user id in this case will be "user_id_1"
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].email,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].email,
|
||||
'test@example.com')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].name,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].name,
|
||||
'test_user')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].password,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].password,
|
||||
'123456')
|
||||
|
||||
self.assertEquals(project.roles["user_id_1"], ['_member_'])
|
||||
|
@ -935,13 +935,13 @@ class UserActionTests(AdjutantTestCase):
|
|||
self.assertEquals(action.valid, True)
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users'][user.id].password,
|
||||
fake_clients.identity_temp_cache['users'][user.id].password,
|
||||
'123456')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users'][user.id].name,
|
||||
fake_clients.identity_temp_cache['users'][user.id].name,
|
||||
'test_user')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users'][user.id].email,
|
||||
fake_clients.identity_temp_cache['users'][user.id].email,
|
||||
'test@example.com')
|
||||
|
||||
@override_settings(USERNAME_IS_EMAIL=True)
|
||||
|
@ -990,11 +990,11 @@ class UserActionTests(AdjutantTestCase):
|
|||
self.assertEquals(action.valid, True)
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].email,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].email,
|
||||
'new_test@example.com')
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].name,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].name,
|
||||
'new_test@example.com')
|
||||
|
||||
@override_settings(USERNAME_IS_EMAIL=True)
|
||||
|
@ -1086,10 +1086,10 @@ class UserActionTests(AdjutantTestCase):
|
|||
self.assertEquals(action.valid, False)
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].email,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].email,
|
||||
'test@example.com')
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].name,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].name,
|
||||
'test@example.com')
|
||||
|
||||
@override_settings(USERNAME_IS_EMAIL=False)
|
||||
|
@ -1136,9 +1136,9 @@ class UserActionTests(AdjutantTestCase):
|
|||
self.assertEquals(action.valid, True)
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].email,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].email,
|
||||
'new_testexample.com')
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].name,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].name,
|
||||
'test_user')
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
from django.conf import settings
|
||||
from django.db import models
|
||||
|
||||
from adjutant.actions import user_store
|
||||
from adjutant.common import user_store
|
||||
from adjutant.actions.v1.base import (
|
||||
UserNameAction, UserIdAction, UserMixin, ProjectMixin)
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ from django.utils import timezone
|
|||
|
||||
from rest_framework.response import Response
|
||||
|
||||
from adjutant.actions import user_store
|
||||
from adjutant.common import user_store
|
||||
from adjutant.api import models
|
||||
from adjutant.api import utils
|
||||
from adjutant.api.v1 import tasks
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
from rest_framework.response import Response
|
||||
from adjutant.actions.user_store import IdentityManager
|
||||
from adjutant.common.user_store import IdentityManager
|
||||
from adjutant.api.models import Task
|
||||
from django.utils import timezone
|
||||
from adjutant.api import utils
|
||||
|
|
|
@ -1,471 +0,0 @@
|
|||
# Copyright (C) 2015 Catalyst IT Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import mock
|
||||
import copy
|
||||
|
||||
from django.conf import settings
|
||||
from django.test.utils import override_settings
|
||||
from django.test import TestCase
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
temp_cache = {}
|
||||
|
||||
|
||||
def setup_temp_cache(projects, users):
|
||||
default_domain = mock.Mock()
|
||||
default_domain.id = 'default'
|
||||
default_domain.name = 'Default'
|
||||
|
||||
admin_user = mock.Mock()
|
||||
admin_user.id = 'user_id_0'
|
||||
admin_user.name = 'admin'
|
||||
admin_user.password = 'password'
|
||||
admin_user.email = 'admin@example.com'
|
||||
admin_user.domain = default_domain.id
|
||||
|
||||
users.update({admin_user.id: admin_user})
|
||||
|
||||
region_one = mock.Mock()
|
||||
region_one.id = 'RegionOne'
|
||||
region_one.name = 'RegionOne'
|
||||
|
||||
region_two = mock.Mock()
|
||||
region_two.id = 'RegionTwo'
|
||||
|
||||
global temp_cache
|
||||
|
||||
# TODO(adriant): region and project keys are name, should be ID.
|
||||
temp_cache = {
|
||||
'i': 1,
|
||||
'users': users,
|
||||
'projects': projects,
|
||||
'roles': {
|
||||
'_member_': '_member_',
|
||||
'admin': 'admin',
|
||||
'project_admin': 'project_admin',
|
||||
'project_mod': 'project_mod',
|
||||
'heat_stack_owner': 'heat_stack_owner'
|
||||
},
|
||||
'regions': {
|
||||
'RegionOne': region_one,
|
||||
'RegionTwo': region_two
|
||||
},
|
||||
'domains': {
|
||||
default_domain.id: default_domain,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class FakeManager(object):
|
||||
|
||||
def _project_from_id(self, project):
|
||||
if isinstance(project, mock.Mock):
|
||||
return project
|
||||
else:
|
||||
return self.get_project(project)
|
||||
|
||||
def _role_from_id(self, role):
|
||||
if isinstance(role, mock.Mock):
|
||||
return role
|
||||
else:
|
||||
return self.get_role(role)
|
||||
|
||||
def _user_from_id(self, user):
|
||||
if isinstance(user, mock.Mock):
|
||||
return user
|
||||
else:
|
||||
return self.get_user(user)
|
||||
|
||||
def _domain_from_id(self, domain):
|
||||
if isinstance(domain, mock.Mock):
|
||||
return domain
|
||||
else:
|
||||
return self.get_domain(domain)
|
||||
|
||||
def find_user(self, name, domain):
|
||||
domain = self._domain_from_id(domain)
|
||||
global temp_cache
|
||||
for user in temp_cache['users'].values():
|
||||
if user.name == name and user.domain == domain.id:
|
||||
return user
|
||||
return None
|
||||
|
||||
def get_user(self, user_id):
|
||||
global temp_cache
|
||||
return temp_cache['users'].get(user_id, None)
|
||||
|
||||
def list_users(self, project):
|
||||
project = self._project_from_id(project)
|
||||
global temp_cache
|
||||
roles = temp_cache['projects'][project.name].roles
|
||||
users = []
|
||||
|
||||
for user_id, user_roles in roles.items():
|
||||
user = self.get_user(user_id)
|
||||
user.roles = []
|
||||
|
||||
for role in user_roles:
|
||||
r = mock.Mock()
|
||||
r.name = role
|
||||
user.roles.append(r)
|
||||
|
||||
users.append(user)
|
||||
return users
|
||||
|
||||
def create_user(self, name, password, email, created_on,
|
||||
domain='default', default_project=None):
|
||||
domain = self._domain_from_id(domain)
|
||||
default_project = self._project_from_id(default_project)
|
||||
global temp_cache
|
||||
user = mock.Mock()
|
||||
user.id = "user_id_%s" % int(temp_cache['i'])
|
||||
user.name = name
|
||||
user.password = password
|
||||
user.email = email
|
||||
user.domain = domain.id
|
||||
user.default_project = default_project
|
||||
temp_cache['users'][user.id] = user
|
||||
|
||||
temp_cache['i'] += 0.5
|
||||
return user
|
||||
|
||||
def update_user_password(self, user, password):
|
||||
user = self._user_from_id(user)
|
||||
user.password = password
|
||||
|
||||
def update_user_name(self, user, username):
|
||||
user = self._user_from_id(user)
|
||||
user.name = username
|
||||
|
||||
def update_user_email(self, user, email):
|
||||
user = self._user_from_id(user)
|
||||
user.email = email
|
||||
|
||||
def enable_user(self, user):
|
||||
user = self._user_from_id(user)
|
||||
user.enabled = True
|
||||
|
||||
def disable_user(self, user):
|
||||
user = self._user_from_id(user)
|
||||
user.enabled = False
|
||||
|
||||
def find_role(self, name):
|
||||
global temp_cache
|
||||
if temp_cache['roles'].get(name, None):
|
||||
role = mock.Mock()
|
||||
role.name = name
|
||||
return role
|
||||
return None
|
||||
|
||||
def get_roles(self, user, project):
|
||||
user = self._user_from_id(user)
|
||||
project = self._project_from_id(project)
|
||||
try:
|
||||
roles = []
|
||||
for role in project.roles[user.id]:
|
||||
r = mock.Mock()
|
||||
r.name = role
|
||||
roles.append(r)
|
||||
return roles
|
||||
except KeyError:
|
||||
return []
|
||||
|
||||
def get_all_roles(self, user):
|
||||
user = self._user_from_id(user)
|
||||
global temp_cache
|
||||
projects = {}
|
||||
for project in temp_cache['projects'].values():
|
||||
projects[project.id] = []
|
||||
for role in project.roles[user.id]:
|
||||
r = mock.Mock()
|
||||
r.name = role
|
||||
projects[project.id].append(r)
|
||||
|
||||
return projects
|
||||
|
||||
def add_user_role(self, user, role, project):
|
||||
user = self._user_from_id(user)
|
||||
role = self._role_from_id(role)
|
||||
project = self._project_from_id(project)
|
||||
try:
|
||||
project.roles[user.id].append(role.name)
|
||||
except KeyError:
|
||||
project.roles[user.id] = [role.name]
|
||||
|
||||
def remove_user_role(self, user, role, project):
|
||||
user = self._user_from_id(user)
|
||||
role = self._role_from_id(role)
|
||||
project = self._project_from_id(project)
|
||||
try:
|
||||
project.roles[user.id].remove(role.name)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def find_project(self, project_name, domain):
|
||||
domain = self._domain_from_id(domain)
|
||||
global temp_cache
|
||||
for project in temp_cache['projects'].values():
|
||||
if project.name == project_name and project.domain == domain.id:
|
||||
return project
|
||||
return None
|
||||
|
||||
def get_project(self, project_id):
|
||||
global temp_cache
|
||||
for project in temp_cache['projects'].values():
|
||||
if project.id == project_id:
|
||||
return project
|
||||
|
||||
def create_project(self, project_name, created_on, parent=None,
|
||||
domain='default', p_id=None):
|
||||
parent = self._project_from_id(parent)
|
||||
domain = self._domain_from_id(domain)
|
||||
global temp_cache
|
||||
project = mock.Mock()
|
||||
if p_id:
|
||||
project.id = p_id
|
||||
else:
|
||||
temp_cache['i'] += 0.5
|
||||
project.id = "project_id_%s" % int(temp_cache['i'])
|
||||
project.name = project_name
|
||||
if parent:
|
||||
project.parent = parent.id
|
||||
else:
|
||||
project.parent = domain.id
|
||||
project.domain = domain.id
|
||||
project.roles = {}
|
||||
temp_cache['projects'][project_name] = project
|
||||
return project
|
||||
|
||||
def update_project(self, project, **kwargs):
|
||||
project = self._project_from_id(project)
|
||||
for key, arg in kwargs.items():
|
||||
if arg is not None:
|
||||
setattr(project, key, arg)
|
||||
return project
|
||||
|
||||
def find_domain(self, domain_name):
|
||||
global temp_cache
|
||||
for domain in temp_cache['domains'].values():
|
||||
if domain.name == domain_name:
|
||||
return domain
|
||||
return None
|
||||
|
||||
def get_domain(self, domain_id):
|
||||
global temp_cache
|
||||
return temp_cache['domains'].get(domain_id, None)
|
||||
|
||||
def get_region(self, region_id):
|
||||
global temp_cache
|
||||
return temp_cache['regions'].get(region_id, None)
|
||||
|
||||
def list_regions(self):
|
||||
global temp_cache
|
||||
return temp_cache['regions'].values()
|
||||
|
||||
|
||||
class modify_dict_settings(override_settings):
|
||||
"""
|
||||
A decorator like djangos modify_settings and override_settings, but makes
|
||||
it possible to do those same operations on dict based settings.
|
||||
|
||||
The decorator will act after both override_settings and modify_settings.
|
||||
|
||||
Can be applied to test functions or AdjutantTestCase,
|
||||
AdjutantAPITestCase classes. In those two classes settings can also
|
||||
be modified using:
|
||||
|
||||
with self.modify_dict_settings(...):
|
||||
# code
|
||||
|
||||
Example Usage:
|
||||
@modify_dict_settings(ROLES_MAPPING=[
|
||||
{'key_list': ['project_mod'],
|
||||
'operation': 'remove',
|
||||
'value': 'heat_stack_owner'},
|
||||
{'key_list': ['project_admin'],
|
||||
'operation': 'append',
|
||||
'value': 'heat_stack_owner'},
|
||||
])
|
||||
or
|
||||
@modify_dict_settings(PROJECT_QUOTA_SIZES={
|
||||
'key_list': ['small', 'nova', 'instances'],
|
||||
'operations': 'override',
|
||||
'value': 11
|
||||
})
|
||||
|
||||
Available operations:
|
||||
Standard operations:
|
||||
- 'update': A dict on dict operation to update final dict with value.
|
||||
- 'override': Either overrides or adds the value to the dictionary.
|
||||
- 'delete': Removes the value from the dictionary.
|
||||
|
||||
List operations:
|
||||
List operations expect that the accessed value in the dictionary is a list.
|
||||
- 'append': Add the specified values to the end of the list
|
||||
- 'prepend': Add the specifed values to the start of the list
|
||||
- 'remove': Remove the specified values from the list
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if args:
|
||||
# Hack used when instantiating from SimpleTestCase.setUpClass.
|
||||
assert not kwargs
|
||||
self.operations = args[0]
|
||||
else:
|
||||
assert not args
|
||||
self.operations = list(kwargs.items())
|
||||
super(override_settings, self).__init__()
|
||||
|
||||
def save_options(self, test_func):
|
||||
if getattr(test_func, "_modified_dict_settings", None) is None:
|
||||
test_func._modified_dict_settings = self.operations
|
||||
else:
|
||||
# Duplicate list to prevent subclasses from altering their parent.
|
||||
test_func._modified_dict_settings = list(
|
||||
test_func._modified_dict_settings) + self.operations
|
||||
|
||||
def disable(self):
|
||||
self.wrapped = self._wrapped
|
||||
for update_dict in self.update_dicts:
|
||||
update_dict['pointer'].clear()
|
||||
update_dict['pointer'].update(update_dict['copy'])
|
||||
|
||||
super(modify_dict_settings, self).disable()
|
||||
|
||||
def enable(self):
|
||||
self.options = {}
|
||||
|
||||
self.update_dicts = []
|
||||
self._wrapped = copy.deepcopy(settings._wrapped)
|
||||
|
||||
for name, operation_list in self.operations:
|
||||
try:
|
||||
value = self.options[name]
|
||||
except KeyError:
|
||||
value = getattr(settings, name, [])
|
||||
|
||||
if not isinstance(value, dict):
|
||||
raise ValueError("Initial setting not dictionary.")
|
||||
|
||||
if not isinstance(operation_list, list):
|
||||
operation_list = [operation_list]
|
||||
|
||||
for operation in operation_list:
|
||||
op_type = operation['operation']
|
||||
|
||||
holding_dict = value
|
||||
|
||||
# Recursively find the dict we want
|
||||
key_len = len(operation['key_list'])
|
||||
final_key = operation['key_list'][0]
|
||||
|
||||
for i in range(key_len):
|
||||
current_key = operation['key_list'][i]
|
||||
if i == (key_len - 1):
|
||||
final_key = current_key
|
||||
else:
|
||||
try:
|
||||
holding_dict = holding_dict[current_key]
|
||||
except KeyError:
|
||||
holding_dict[current_key] = {}
|
||||
holding_dict = holding_dict[current_key]
|
||||
|
||||
if op_type == "override":
|
||||
holding_dict[final_key] = operation['value']
|
||||
elif op_type == "delete":
|
||||
del holding_dict[final_key]
|
||||
elif op_type == "update":
|
||||
# Needs to be saved seperately and update re-used on
|
||||
# disable due to pointers
|
||||
self.update_dicts.append(
|
||||
{'pointer': holding_dict[final_key],
|
||||
'copy': copy.deepcopy(holding_dict[final_key])})
|
||||
holding_dict[final_key].update(operation['value'])
|
||||
else:
|
||||
val = holding_dict.get(final_key, [])
|
||||
items = operation['value']
|
||||
|
||||
if not isinstance(items, list):
|
||||
items = [items]
|
||||
|
||||
if op_type == 'append':
|
||||
holding_dict[final_key] = val + [
|
||||
item for item in items if item not in val]
|
||||
elif op_type == 'prepend':
|
||||
holding_dict[final_key] = ([item for item in items if
|
||||
item not in val] + val)
|
||||
elif op_type == 'remove':
|
||||
holding_dict[final_key] = [
|
||||
item for item in val if item not in items]
|
||||
else:
|
||||
raise ValueError("Unsupported action: %s" % op_type)
|
||||
self.options[name] = value
|
||||
super(modify_dict_settings, self).enable()
|
||||
|
||||
|
||||
class TestCaseMixin(object):
|
||||
""" Mixin to add modify_dict_settings functions to test classes """
|
||||
|
||||
@classmethod
|
||||
def _apply_settings_changes(cls):
|
||||
if getattr(cls, '_modified_dict_settings', None):
|
||||
operations = {}
|
||||
for key, value in cls._modified_dict_settings:
|
||||
operations[key] = value
|
||||
cls._cls_modified_dict_context = modify_dict_settings(
|
||||
**operations)
|
||||
cls._cls_modified_dict_context.enable()
|
||||
|
||||
@classmethod
|
||||
def _remove_settings_changes(cls):
|
||||
if hasattr(cls, '_cls_modified_dict_context'):
|
||||
cls._cls_modified_dict_context.disable()
|
||||
delattr(cls, '_cls_modified_dict_context')
|
||||
|
||||
def modify_dict_settings(self, **kwargs):
|
||||
return modify_dict_settings(**kwargs)
|
||||
|
||||
|
||||
class AdjutantTestCase(TestCase, TestCaseMixin):
|
||||
"""
|
||||
TestCase override that has support for @modify_dict_settings as a
|
||||
class decorator and internal function
|
||||
"""
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(AdjutantTestCase, cls).setUpClass()
|
||||
cls._apply_settings_changes()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls._remove_settings_changes()
|
||||
super(AdjutantTestCase, cls).tearDownClass()
|
||||
|
||||
|
||||
class AdjutantAPITestCase(APITestCase, TestCaseMixin):
|
||||
"""
|
||||
APITestCase override that has support for @modify_dict_settings as a
|
||||
class decorator, and internal function
|
||||
"""
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(AdjutantAPITestCase, cls).setUpClass()
|
||||
cls._apply_settings_changes()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls._remove_settings_changes()
|
||||
super(AdjutantAPITestCase, cls).tearDownClass()
|
|
@ -27,11 +27,11 @@ from rest_framework import status
|
|||
from rest_framework.test import APITestCase
|
||||
|
||||
from adjutant.api.models import Task, Token, Notification
|
||||
from adjutant.api.v1.tests import (FakeManager, setup_temp_cache,
|
||||
modify_dict_settings)
|
||||
from adjutant.common.tests.fake_clients import FakeManager, setup_temp_cache
|
||||
from adjutant.common.tests.utils import modify_dict_settings
|
||||
|
||||
|
||||
@mock.patch('adjutant.actions.user_store.IdentityManager',
|
||||
@mock.patch('adjutant.common.user_store.IdentityManager',
|
||||
FakeManager)
|
||||
class AdminAPITests(APITestCase):
|
||||
"""
|
||||
|
|
|
@ -22,16 +22,15 @@ from django.utils import timezone
|
|||
from django.conf import settings
|
||||
|
||||
from adjutant.api.models import Token, Task
|
||||
from adjutant.api.v1.tests import FakeManager, setup_temp_cache
|
||||
from adjutant.actions.v1.tests import (
|
||||
get_fake_neutron, get_fake_novaclient, get_fake_cinderclient,
|
||||
cinder_cache, nova_cache, neutron_cache,
|
||||
from adjutant.common.tests.fake_clients import (
|
||||
FakeManager, setup_temp_cache, get_fake_neutron, get_fake_novaclient,
|
||||
get_fake_cinderclient, cinder_cache, nova_cache, neutron_cache,
|
||||
setup_mock_caches, setup_quota_cache, FakeResource)
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
|
||||
@mock.patch('adjutant.actions.user_store.IdentityManager',
|
||||
@mock.patch('adjutant.common.user_store.IdentityManager',
|
||||
FakeManager)
|
||||
class OpenstackAPITests(APITestCase):
|
||||
"""
|
||||
|
@ -321,7 +320,7 @@ class OpenstackAPITests(APITestCase):
|
|||
|
||||
|
||||
@mock.patch(
|
||||
'adjutant.actions.user_store.IdentityManager',
|
||||
'adjutant.common.user_store.IdentityManager',
|
||||
FakeManager)
|
||||
@mock.patch(
|
||||
'adjutant.common.quota.get_novaclient',
|
||||
|
|
|
@ -20,12 +20,13 @@ from django.core import mail
|
|||
from rest_framework import status
|
||||
|
||||
from adjutant.api.models import Task, Token
|
||||
from adjutant.api.v1.tests import (FakeManager, setup_temp_cache,
|
||||
AdjutantAPITestCase, modify_dict_settings)
|
||||
from adjutant.api.v1 import tests
|
||||
from adjutant.common.tests.fake_clients import FakeManager, setup_temp_cache
|
||||
from adjutant.common.tests import fake_clients
|
||||
from adjutant.common.tests.utils import (AdjutantAPITestCase,
|
||||
modify_dict_settings)
|
||||
|
||||
|
||||
@mock.patch('adjutant.actions.user_store.IdentityManager',
|
||||
@mock.patch('adjutant.common.user_store.IdentityManager',
|
||||
FakeManager)
|
||||
class TaskViewTests(AdjutantAPITestCase):
|
||||
"""
|
||||
|
@ -110,7 +111,7 @@ class TaskViewTests(AdjutantAPITestCase):
|
|||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(len(mail.outbox), 2)
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].name,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].name,
|
||||
'test@example.com')
|
||||
|
||||
def test_new_user_no_project(self):
|
||||
|
@ -314,7 +315,7 @@ class TaskViewTests(AdjutantAPITestCase):
|
|||
response.data,
|
||||
{'notes': ['created token']}
|
||||
)
|
||||
tests.temp_cache['projects'] = {}
|
||||
fake_clients.identity_temp_cache['projects'] = {}
|
||||
|
||||
new_token = Token.objects.all()[0]
|
||||
url = "/v1/tokens/" + new_token.token
|
||||
|
@ -1062,7 +1063,7 @@ class TaskViewTests(AdjutantAPITestCase):
|
|||
self.assertEqual(len(mail.outbox), 2)
|
||||
|
||||
self.assertEquals(
|
||||
tests.temp_cache['users']["user_id_1"].name,
|
||||
fake_clients.identity_temp_cache['users']["user_id_1"].name,
|
||||
'new_user')
|
||||
|
||||
@override_settings(USERNAME_IS_EMAIL=False)
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
# under the License.
|
||||
|
||||
|
||||
from adjutant.actions.openstack_clients import (
|
||||
from adjutant.common.openstack_clients import (
|
||||
get_novaclient, get_cinderclient, get_neutronclient)
|
||||
|
||||
from django.conf import settings
|
||||
|
|
|
@ -0,0 +1,571 @@
|
|||
# Copyright (C) 2015 Catalyst IT Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
import mock
|
||||
|
||||
|
||||
identity_temp_cache = {}
|
||||
|
||||
neutron_cache = {}
|
||||
nova_cache = {}
|
||||
cinder_cache = {}
|
||||
|
||||
|
||||
def setup_temp_cache(projects, users):
|
||||
default_domain = mock.Mock()
|
||||
default_domain.id = 'default'
|
||||
default_domain.name = 'Default'
|
||||
|
||||
admin_user = mock.Mock()
|
||||
admin_user.id = 'user_id_0'
|
||||
admin_user.name = 'admin'
|
||||
admin_user.password = 'password'
|
||||
admin_user.email = 'admin@example.com'
|
||||
admin_user.domain = default_domain.id
|
||||
|
||||
users.update({admin_user.id: admin_user})
|
||||
|
||||
region_one = mock.Mock()
|
||||
region_one.id = 'RegionOne'
|
||||
region_one.name = 'RegionOne'
|
||||
|
||||
region_two = mock.Mock()
|
||||
region_two.id = 'RegionTwo'
|
||||
|
||||
global identity_temp_cache
|
||||
|
||||
# TODO(adriant): region and project keys are name, should be ID.
|
||||
identity_temp_cache = {
|
||||
'i': 1,
|
||||
'users': users,
|
||||
'projects': projects,
|
||||
'roles': {
|
||||
'_member_': '_member_',
|
||||
'admin': 'admin',
|
||||
'project_admin': 'project_admin',
|
||||
'project_mod': 'project_mod',
|
||||
'heat_stack_owner': 'heat_stack_owner'
|
||||
},
|
||||
'regions': {
|
||||
'RegionOne': region_one,
|
||||
'RegionTwo': region_two
|
||||
},
|
||||
'domains': {
|
||||
default_domain.id: default_domain,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class FakeManager(object):
|
||||
|
||||
def _project_from_id(self, project):
|
||||
if isinstance(project, mock.Mock):
|
||||
return project
|
||||
else:
|
||||
return self.get_project(project)
|
||||
|
||||
def _role_from_id(self, role):
|
||||
if isinstance(role, mock.Mock):
|
||||
return role
|
||||
else:
|
||||
return self.get_role(role)
|
||||
|
||||
def _user_from_id(self, user):
|
||||
if isinstance(user, mock.Mock):
|
||||
return user
|
||||
else:
|
||||
return self.get_user(user)
|
||||
|
||||
def _domain_from_id(self, domain):
|
||||
if isinstance(domain, mock.Mock):
|
||||
return domain
|
||||
else:
|
||||
return self.get_domain(domain)
|
||||
|
||||
def find_user(self, name, domain):
|
||||
domain = self._domain_from_id(domain)
|
||||
global identity_temp_cache
|
||||
for user in identity_temp_cache['users'].values():
|
||||
if user.name == name and user.domain == domain.id:
|
||||
return user
|
||||
return None
|
||||
|
||||
def get_user(self, user_id):
|
||||
global identity_temp_cache
|
||||
return identity_temp_cache['users'].get(user_id, None)
|
||||
|
||||
def list_users(self, project):
|
||||
project = self._project_from_id(project)
|
||||
global identity_temp_cache
|
||||
roles = identity_temp_cache['projects'][project.name].roles
|
||||
users = []
|
||||
|
||||
for user_id, user_roles in roles.items():
|
||||
user = self.get_user(user_id)
|
||||
user.roles = []
|
||||
|
||||
for role in user_roles:
|
||||
r = mock.Mock()
|
||||
r.name = role
|
||||
user.roles.append(r)
|
||||
|
||||
users.append(user)
|
||||
return users
|
||||
|
||||
def create_user(self, name, password, email, created_on,
|
||||
domain='default', default_project=None):
|
||||
domain = self._domain_from_id(domain)
|
||||
default_project = self._project_from_id(default_project)
|
||||
global identity_temp_cache
|
||||
user = mock.Mock()
|
||||
user.id = "user_id_%s" % int(identity_temp_cache['i'])
|
||||
user.name = name
|
||||
user.password = password
|
||||
user.email = email
|
||||
user.domain = domain.id
|
||||
user.default_project = default_project
|
||||
identity_temp_cache['users'][user.id] = user
|
||||
|
||||
identity_temp_cache['i'] += 0.5
|
||||
return user
|
||||
|
||||
def update_user_password(self, user, password):
|
||||
user = self._user_from_id(user)
|
||||
user.password = password
|
||||
|
||||
def update_user_name(self, user, username):
|
||||
user = self._user_from_id(user)
|
||||
user.name = username
|
||||
|
||||
def update_user_email(self, user, email):
|
||||
user = self._user_from_id(user)
|
||||
user.email = email
|
||||
|
||||
def enable_user(self, user):
|
||||
user = self._user_from_id(user)
|
||||
user.enabled = True
|
||||
|
||||
def disable_user(self, user):
|
||||
user = self._user_from_id(user)
|
||||
user.enabled = False
|
||||
|
||||
def find_role(self, name):
|
||||
global identity_temp_cache
|
||||
if identity_temp_cache['roles'].get(name, None):
|
||||
role = mock.Mock()
|
||||
role.name = name
|
||||
return role
|
||||
return None
|
||||
|
||||
def get_roles(self, user, project):
|
||||
user = self._user_from_id(user)
|
||||
project = self._project_from_id(project)
|
||||
try:
|
||||
roles = []
|
||||
for role in project.roles[user.id]:
|
||||
r = mock.Mock()
|
||||
r.name = role
|
||||
roles.append(r)
|
||||
return roles
|
||||
except KeyError:
|
||||
return []
|
||||
|
||||
def get_all_roles(self, user):
|
||||
user = self._user_from_id(user)
|
||||
global identity_temp_cache
|
||||
projects = {}
|
||||
for project in identity_temp_cache['projects'].values():
|
||||
projects[project.id] = []
|
||||
for role in project.roles[user.id]:
|
||||
r = mock.Mock()
|
||||
r.name = role
|
||||
projects[project.id].append(r)
|
||||
|
||||
return projects
|
||||
|
||||
def add_user_role(self, user, role, project):
|
||||
user = self._user_from_id(user)
|
||||
role = self._role_from_id(role)
|
||||
project = self._project_from_id(project)
|
||||
try:
|
||||
project.roles[user.id].append(role.name)
|
||||
except KeyError:
|
||||
project.roles[user.id] = [role.name]
|
||||
|
||||
def remove_user_role(self, user, role, project):
|
||||
user = self._user_from_id(user)
|
||||
role = self._role_from_id(role)
|
||||
project = self._project_from_id(project)
|
||||
try:
|
||||
project.roles[user.id].remove(role.name)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def find_project(self, project_name, domain):
|
||||
domain = self._domain_from_id(domain)
|
||||
global identity_temp_cache
|
||||
for project in identity_temp_cache['projects'].values():
|
||||
if project.name == project_name and project.domain == domain.id:
|
||||
return project
|
||||
return None
|
||||
|
||||
def get_project(self, project_id):
|
||||
global identity_temp_cache
|
||||
for project in identity_temp_cache['projects'].values():
|
||||
if project.id == project_id:
|
||||
return project
|
||||
|
||||
def create_project(self, project_name, created_on, parent=None,
|
||||
domain='default', p_id=None):
|
||||
parent = self._project_from_id(parent)
|
||||
domain = self._domain_from_id(domain)
|
||||
global identity_temp_cache
|
||||
project = mock.Mock()
|
||||
if p_id:
|
||||
project.id = p_id
|
||||
else:
|
||||
identity_temp_cache['i'] += 0.5
|
||||
project.id = "project_id_%s" % int(identity_temp_cache['i'])
|
||||
project.name = project_name
|
||||
if parent:
|
||||
project.parent = parent.id
|
||||
else:
|
||||
project.parent = domain.id
|
||||
project.domain = domain.id
|
||||
project.roles = {}
|
||||
identity_temp_cache['projects'][project_name] = project
|
||||
return project
|
||||
|
||||
def update_project(self, project, **kwargs):
|
||||
project = self._project_from_id(project)
|
||||
for key, arg in kwargs.items():
|
||||
if arg is not None:
|
||||
setattr(project, key, arg)
|
||||
return project
|
||||
|
||||
def find_domain(self, domain_name):
|
||||
global identity_temp_cache
|
||||
for domain in identity_temp_cache['domains'].values():
|
||||
if domain.name == domain_name:
|
||||
return domain
|
||||
return None
|
||||
|
||||
def get_domain(self, domain_id):
|
||||
global identity_temp_cache
|
||||
return identity_temp_cache['domains'].get(domain_id, None)
|
||||
|
||||
def get_region(self, region_id):
|
||||
global identity_temp_cache
|
||||
return identity_temp_cache['regions'].get(region_id, None)
|
||||
|
||||
def list_regions(self):
|
||||
global identity_temp_cache
|
||||
return identity_temp_cache['regions'].values()
|
||||
|
||||
|
||||
class FakeOpenstackClient(object):
|
||||
class Quotas(object):
|
||||
""" Stub class for testing quotas """
|
||||
def __init__(self, service):
|
||||
self.service = service
|
||||
|
||||
def update(self, project_id, **kwargs):
|
||||
self.service.update_quota(project_id, **kwargs)
|
||||
|
||||
def get(self, project_id):
|
||||
return self.QuotaSet(
|
||||
self.service._cache[self.service.region][project_id]['quota'])
|
||||
|
||||
class QuotaSet(object):
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
|
||||
def to_dict(self):
|
||||
return self.data
|
||||
|
||||
def __init__(self, region, cache):
|
||||
self.region = region
|
||||
self._cache = cache
|
||||
self.quotas = FakeOpenstackClient.Quotas(self)
|
||||
|
||||
def update_quota(self, project_id, **kwargs):
|
||||
if self.region not in self._cache:
|
||||
self._cache[self.region] = {}
|
||||
if project_id not in self._cache[self.region]:
|
||||
self._cache[self.region][project_id] = {
|
||||
'quota': {}
|
||||
}
|
||||
quota = self._cache[self.region][project_id]['quota']
|
||||
quota.update(kwargs)
|
||||
|
||||
|
||||
class FakeNeutronClient(object):
|
||||
|
||||
def __init__(self, region):
|
||||
self.region = region
|
||||
|
||||
def create_network(self, body):
|
||||
global neutron_cache
|
||||
project_id = body['network']['tenant_id']
|
||||
net = {'network': {'id': 'net_id_%s' % neutron_cache['RegionOne']['i'],
|
||||
'body': body}}
|
||||
net_id = net['network']['id']
|
||||
neutron_cache['RegionOne'][project_id]['networks'][net_id] = net
|
||||
neutron_cache['RegionOne']['i'] += 1
|
||||
return net
|
||||
|
||||
def create_subnet(self, body):
|
||||
global neutron_cache
|
||||
project_id = body['subnet']['tenant_id']
|
||||
subnet = {'subnet': {'id': 'subnet_id_%s'
|
||||
% neutron_cache['RegionOne']['i'],
|
||||
'body': body}}
|
||||
sub_id = subnet['subnet']['id']
|
||||
neutron_cache['RegionOne'][project_id]['subnets'][sub_id] = subnet
|
||||
neutron_cache['RegionOne']['i'] += 1
|
||||
return subnet
|
||||
|
||||
def create_router(self, body):
|
||||
global neutron_cache
|
||||
project_id = body['router']['tenant_id']
|
||||
router = {'router': {'id': 'router_id_%s'
|
||||
% neutron_cache['RegionOne']['i'],
|
||||
'body': body}}
|
||||
router_id = router['router']['id']
|
||||
neutron_cache['RegionOne'][project_id]['routers'][router_id] = router
|
||||
neutron_cache['RegionOne']['i'] += 1
|
||||
return router
|
||||
|
||||
def add_interface_router(self, router_id, body):
|
||||
global neutron_cache
|
||||
port_id = "port_id_%s" % neutron_cache['RegionOne']['i']
|
||||
neutron_cache['RegionOne']['i'] += 1
|
||||
interface = {
|
||||
'port_id': port_id,
|
||||
'id': router_id,
|
||||
'subnet_id': body['subnet_id']}
|
||||
return interface
|
||||
|
||||
def update_quota(self, project_id, body):
|
||||
global neutron_cache
|
||||
if self.region not in neutron_cache:
|
||||
neutron_cache[self.region] = {}
|
||||
if project_id not in neutron_cache[self.region]:
|
||||
neutron_cache[self.region][project_id] = {}
|
||||
|
||||
if 'quota' not in neutron_cache[self.region][project_id]:
|
||||
neutron_cache[self.region][project_id]['quota'] = {}
|
||||
|
||||
quota = neutron_cache[self.region][project_id]['quota']
|
||||
quota.update(body['quota'])
|
||||
|
||||
def show_quota(self, project_id):
|
||||
return {"quota": neutron_cache[self.region][project_id]['quota']}
|
||||
|
||||
def list_networks(self, tenant_id):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_routers(self, tenant_id):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_subnets(self, tenant_id=0):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_security_groups(self, tenant_id=0):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_floatingips(self, tenant_id=0):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_security_group_rules(self, tenant_id=0):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
def list_ports(self, tenant_id=0):
|
||||
return neutron_cache[self.region][tenant_id]
|
||||
|
||||
|
||||
class FakeNovaClient(FakeOpenstackClient):
|
||||
|
||||
def __init__(self, region):
|
||||
global nova_cache
|
||||
super(FakeNovaClient, self).__init__(region, nova_cache)
|
||||
self.limits = self.LimitFakers(nova_cache[region])
|
||||
|
||||
class LimitFakers(object):
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
|
||||
def get(self, tenant_id):
|
||||
return self.LimitFake(self.data, tenant_id)
|
||||
|
||||
class LimitFake(object):
|
||||
def __init__(self, data, project_id):
|
||||
self.project_id = project_id
|
||||
self.data = data
|
||||
|
||||
def to_dict(self):
|
||||
return self.data[self.project_id]
|
||||
|
||||
|
||||
class FakeCinderClient(FakeOpenstackClient):
|
||||
class FakeResourceGroup(object):
|
||||
""" Stub class to represent volumes and snapshots """
|
||||
|
||||
def __init__(self, region, cache_key):
|
||||
self.region = region
|
||||
self.key = cache_key
|
||||
|
||||
def list(self, search_opts=None):
|
||||
if search_opts:
|
||||
project_id = search_opts['project_id']
|
||||
global cinder_cache
|
||||
return cinder_cache[self.region][project_id][self.key]
|
||||
|
||||
def __init__(self, region):
|
||||
global cinder_cache
|
||||
self.region = region
|
||||
self._cache = cinder_cache
|
||||
self.quotas = FakeOpenstackClient.Quotas(self)
|
||||
self.volumes = self.FakeResourceGroup(region, 'volumes')
|
||||
self.volume_snapshots = self.FakeResourceGroup(region,
|
||||
'volume_snapshots')
|
||||
|
||||
|
||||
class FakeResource(object):
|
||||
""" Stub class to represent an individual instance of a volume or
|
||||
snapshot """
|
||||
|
||||
def __init__(self, size):
|
||||
self.size = size
|
||||
|
||||
|
||||
def setup_neutron_cache(region, project_id):
|
||||
global neutron_cache
|
||||
if region not in neutron_cache:
|
||||
neutron_cache[region] = {'i': 0}
|
||||
else:
|
||||
neutron_cache[region]['i'] = 0
|
||||
if project_id not in neutron_cache[region]:
|
||||
neutron_cache[region][project_id] = {}
|
||||
|
||||
neutron_cache[region][project_id] = {
|
||||
'networks': {},
|
||||
'subnets': {},
|
||||
'routers': {},
|
||||
'security_groups': {},
|
||||
'floatingips': {},
|
||||
'security_group_rules': {},
|
||||
'ports': {},
|
||||
}
|
||||
|
||||
neutron_cache[region][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES['small']['neutron'])
|
||||
|
||||
|
||||
def setup_cinder_cache(region, project_id):
|
||||
global cinder_cache
|
||||
if region not in cinder_cache:
|
||||
cinder_cache[region] = {}
|
||||
if project_id not in cinder_cache[region]:
|
||||
cinder_cache[region][project_id] = {}
|
||||
|
||||
cinder_cache[region][project_id] = {
|
||||
'volumes': [],
|
||||
'volume_snapshots': [],
|
||||
}
|
||||
|
||||
cinder_cache[region][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES['small']['cinder'])
|
||||
|
||||
|
||||
def setup_nova_cache(region, project_id):
|
||||
global nova_cache
|
||||
if region not in nova_cache:
|
||||
nova_cache[region] = {}
|
||||
if project_id not in nova_cache[region]:
|
||||
nova_cache[region][project_id] = {}
|
||||
|
||||
# Mocking the nova limits api
|
||||
nova_cache[region][project_id] = {
|
||||
'absolute': {
|
||||
"totalInstancesUsed": 0,
|
||||
"totalFloatingIpsUsed": 0,
|
||||
"totalRAMUsed": 0,
|
||||
"totalCoresUsed": 0,
|
||||
"totalSecurityGroupsUsed": 0
|
||||
}
|
||||
}
|
||||
nova_cache[region][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES['small']['nova'])
|
||||
|
||||
|
||||
def setup_quota_cache(region_name, project_id, size='small'):
|
||||
""" Sets up the quota cache for a given region and project """
|
||||
global cinder_cache
|
||||
|
||||
if region_name not in cinder_cache:
|
||||
cinder_cache[region_name] = {}
|
||||
|
||||
if project_id not in cinder_cache[region_name]:
|
||||
cinder_cache[region_name][project_id] = {
|
||||
'quota': {}
|
||||
}
|
||||
|
||||
cinder_cache[region_name][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES[size]['cinder'])
|
||||
|
||||
global nova_cache
|
||||
if region_name not in nova_cache:
|
||||
nova_cache[region_name] = {}
|
||||
|
||||
if project_id not in nova_cache[region_name]:
|
||||
nova_cache[region_name][project_id] = {
|
||||
'quota': {}
|
||||
}
|
||||
|
||||
nova_cache[region_name][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES[size]['nova'])
|
||||
|
||||
global neutron_cache
|
||||
if region_name not in neutron_cache:
|
||||
neutron_cache[region_name] = {}
|
||||
|
||||
if project_id not in neutron_cache[region_name]:
|
||||
neutron_cache[region_name][project_id] = {
|
||||
'quota': {}
|
||||
}
|
||||
|
||||
neutron_cache[region_name][project_id]['quota'] = dict(
|
||||
settings.PROJECT_QUOTA_SIZES[size]['neutron'])
|
||||
|
||||
|
||||
def setup_mock_caches(region, project_id):
|
||||
setup_nova_cache(region, project_id)
|
||||
setup_cinder_cache(region, project_id)
|
||||
setup_neutron_cache(region, project_id)
|
||||
|
||||
|
||||
def get_fake_neutron(region):
|
||||
return FakeNeutronClient(region)
|
||||
|
||||
|
||||
def get_fake_novaclient(region):
|
||||
return FakeNovaClient(region)
|
||||
|
||||
|
||||
def get_fake_cinderclient(region):
|
||||
global cinder_cache
|
||||
return FakeCinderClient(region)
|
|
@ -17,12 +17,14 @@ import mock
|
|||
from rest_framework import status
|
||||
|
||||
from adjutant.api.models import Token
|
||||
from adjutant.api.v1.tests import (FakeManager, setup_temp_cache,
|
||||
AdjutantAPITestCase, modify_dict_settings)
|
||||
from adjutant.common.tests.fake_clients import FakeManager, setup_temp_cache
|
||||
from adjutant.common.tests.utils import (AdjutantAPITestCase,
|
||||
modify_dict_settings)
|
||||
|
||||
from django.core import mail
|
||||
|
||||
|
||||
@mock.patch('adjutant.actions.user_store.IdentityManager',
|
||||
@mock.patch('adjutant.common.user_store.IdentityManager',
|
||||
FakeManager)
|
||||
class ModifySettingsTests(AdjutantAPITestCase):
|
||||
"""
|
|
@ -0,0 +1,216 @@
|
|||
# Copyright (C) 2015 Catalyst IT Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import copy
|
||||
|
||||
from django.conf import settings
|
||||
from django.test.utils import override_settings
|
||||
from django.test import TestCase
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
|
||||
class modify_dict_settings(override_settings):
|
||||
"""
|
||||
A decorator like djangos modify_settings and override_settings, but makes
|
||||
it possible to do those same operations on dict based settings.
|
||||
|
||||
The decorator will act after both override_settings and modify_settings.
|
||||
|
||||
Can be applied to test functions or AdjutantTestCase,
|
||||
AdjutantAPITestCase classes. In those two classes settings can also
|
||||
be modified using:
|
||||
|
||||
with self.modify_dict_settings(...):
|
||||
# code
|
||||
|
||||
Example Usage:
|
||||
@modify_dict_settings(ROLES_MAPPING=[
|
||||
{'key_list': ['project_mod'],
|
||||
'operation': 'remove',
|
||||
'value': 'heat_stack_owner'},
|
||||
{'key_list': ['project_admin'],
|
||||
'operation': 'append',
|
||||
'value': 'heat_stack_owner'},
|
||||
])
|
||||
or
|
||||
@modify_dict_settings(PROJECT_QUOTA_SIZES={
|
||||
'key_list': ['small', 'nova', 'instances'],
|
||||
'operations': 'override',
|
||||
'value': 11
|
||||
})
|
||||
|
||||
Available operations:
|
||||
Standard operations:
|
||||
- 'update': A dict on dict operation to update final dict with value.
|
||||
- 'override': Either overrides or adds the value to the dictionary.
|
||||
- 'delete': Removes the value from the dictionary.
|
||||
|
||||
List operations:
|
||||
List operations expect that the accessed value in the dictionary is a list.
|
||||
- 'append': Add the specified values to the end of the list
|
||||
- 'prepend': Add the specifed values to the start of the list
|
||||
- 'remove': Remove the specified values from the list
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if args:
|
||||
# Hack used when instantiating from SimpleTestCase.setUpClass.
|
||||
assert not kwargs
|
||||
self.operations = args[0]
|
||||
else:
|
||||
assert not args
|
||||
self.operations = list(kwargs.items())
|
||||
super(override_settings, self).__init__()
|
||||
|
||||
def save_options(self, test_func):
|
||||
if getattr(test_func, "_modified_dict_settings", None) is None:
|
||||
test_func._modified_dict_settings = self.operations
|
||||
else:
|
||||
# Duplicate list to prevent subclasses from altering their parent.
|
||||
test_func._modified_dict_settings = list(
|
||||
test_func._modified_dict_settings) + self.operations
|
||||
|
||||
def disable(self):
|
||||
self.wrapped = self._wrapped
|
||||
for update_dict in self.update_dicts:
|
||||
update_dict['pointer'].clear()
|
||||
update_dict['pointer'].update(update_dict['copy'])
|
||||
|
||||
super(modify_dict_settings, self).disable()
|
||||
|
||||
def enable(self):
|
||||
self.options = {}
|
||||
|
||||
self.update_dicts = []
|
||||
self._wrapped = copy.deepcopy(settings._wrapped)
|
||||
|
||||
for name, operation_list in self.operations:
|
||||
try:
|
||||
value = self.options[name]
|
||||
except KeyError:
|
||||
value = getattr(settings, name, [])
|
||||
|
||||
if not isinstance(value, dict):
|
||||
raise ValueError("Initial setting not dictionary.")
|
||||
|
||||
if not isinstance(operation_list, list):
|
||||
operation_list = [operation_list]
|
||||
|
||||
for operation in operation_list:
|
||||
op_type = operation['operation']
|
||||
|
||||
holding_dict = value
|
||||
|
||||
# Recursively find the dict we want
|
||||
key_len = len(operation['key_list'])
|
||||
final_key = operation['key_list'][0]
|
||||
|
||||
for i in range(key_len):
|
||||
current_key = operation['key_list'][i]
|
||||
if i == (key_len - 1):
|
||||
final_key = current_key
|
||||
else:
|
||||
try:
|
||||
holding_dict = holding_dict[current_key]
|
||||
except KeyError:
|
||||
holding_dict[current_key] = {}
|
||||
holding_dict = holding_dict[current_key]
|
||||
|
||||
if op_type == "override":
|
||||
holding_dict[final_key] = operation['value']
|
||||
elif op_type == "delete":
|
||||
del holding_dict[final_key]
|
||||
elif op_type == "update":
|
||||
# Needs to be saved seperately and update re-used on
|
||||
# disable due to pointers
|
||||
self.update_dicts.append(
|
||||
{'pointer': holding_dict[final_key],
|
||||
'copy': copy.deepcopy(holding_dict[final_key])})
|
||||
holding_dict[final_key].update(operation['value'])
|
||||
else:
|
||||
val = holding_dict.get(final_key, [])
|
||||
items = operation['value']
|
||||
|
||||
if not isinstance(items, list):
|
||||
items = [items]
|
||||
|
||||
if op_type == 'append':
|
||||
holding_dict[final_key] = val + [
|
||||
item for item in items if item not in val]
|
||||
elif op_type == 'prepend':
|
||||
holding_dict[final_key] = ([item for item in items if
|
||||
item not in val] + val)
|
||||
elif op_type == 'remove':
|
||||
holding_dict[final_key] = [
|
||||
item for item in val if item not in items]
|
||||
else:
|
||||
raise ValueError("Unsupported action: %s" % op_type)
|
||||
self.options[name] = value
|
||||
super(modify_dict_settings, self).enable()
|
||||
|
||||
|
||||
class TestCaseMixin(object):
|
||||
""" Mixin to add modify_dict_settings functions to test classes """
|
||||
|
||||
@classmethod
|
||||
def _apply_settings_changes(cls):
|
||||
if getattr(cls, '_modified_dict_settings', None):
|
||||
operations = {}
|
||||
for key, value in cls._modified_dict_settings:
|
||||
operations[key] = value
|
||||
cls._cls_modified_dict_context = modify_dict_settings(
|
||||
**operations)
|
||||
cls._cls_modified_dict_context.enable()
|
||||
|
||||
@classmethod
|
||||
def _remove_settings_changes(cls):
|
||||
if hasattr(cls, '_cls_modified_dict_context'):
|
||||
cls._cls_modified_dict_context.disable()
|
||||
delattr(cls, '_cls_modified_dict_context')
|
||||
|
||||
def modify_dict_settings(self, **kwargs):
|
||||
return modify_dict_settings(**kwargs)
|
||||
|
||||
|
||||
class AdjutantTestCase(TestCase, TestCaseMixin):
|
||||
"""
|
||||
TestCase override that has support for @modify_dict_settings as a
|
||||
class decorator and internal function
|
||||
"""
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(AdjutantTestCase, cls).setUpClass()
|
||||
cls._apply_settings_changes()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls._remove_settings_changes()
|
||||
super(AdjutantTestCase, cls).tearDownClass()
|
||||
|
||||
|
||||
class AdjutantAPITestCase(APITestCase, TestCaseMixin):
|
||||
"""
|
||||
APITestCase override that has support for @modify_dict_settings as a
|
||||
class decorator, and internal function
|
||||
"""
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(AdjutantAPITestCase, cls).setUpClass()
|
||||
cls._apply_settings_changes()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls._remove_settings_changes()
|
||||
super(AdjutantAPITestCase, cls).tearDownClass()
|
|
@ -18,7 +18,7 @@ from django.conf import settings
|
|||
|
||||
from keystoneclient import exceptions as ks_exceptions
|
||||
|
||||
from adjutant.actions.openstack_clients import get_keystoneclient
|
||||
from adjutant.common.openstack_clients import get_keystoneclient
|
||||
|
||||
|
||||
def get_managable_roles(user_roles):
|
Loading…
Reference in New Issue