More complete testing of the os plugins/utils (#24)

The change implements several fixes so that monitor stack can be run in
OpenStack environments supporting both V2 and V3 authentication.

All of the plugins now have a proper name lookup and will pull from a local
cache instead of hammering the API on every request.

The Local caching functionlaity used to be tied to the python shelve
module. Due to issues with Python 3.5 and shelves the library was
replaced with the diskcache lib which implements a caching interface
supporting both py2.7+.

Tests have been added in support of all additions.

Tests have been added for the os_utils module which should bring it
within ~98% of completeness.

Signed-off-by: Kevin Carter <kevin.carter@rackspace.com>
This commit is contained in:
Kevin Carter 2017-03-14 11:34:10 -05:00 committed by Major Hayden
parent 79d23cd713
commit 775f034a78
19 changed files with 741 additions and 176 deletions

2
.gitignore vendored
View File

@ -44,6 +44,8 @@ nosetests.xml
coverage.xml
*,cover
.hypothesis/
AUTHORS
ChangeLog
# Translations
*.mo

View File

@ -4,32 +4,32 @@
# override whatever is needed within the local sections.
[DEFAULT]
insecure = false
auth_url = https://example.com:5000/v3
# The verify option is for SSL. If your SSL certificate is not
# valid set this option to false else omit it or set it true.
insecure = true
auth_url = https://127.0.0.1:5000/v3
username = admin
password = Secrete
[keystone]
# NOTE(cloudnull):
# When using keystone V3 you will need the .*domain_name configuration options.
user_domain_name = default # This is required when Keystone V3 is being used
project_domain_name = default # This is required when Keystone V3 is being used
[glance]
# NOTE(cloudnull):
# If you're using keystone V2 you will need the tenant_name option.
tenant_name = admin # This is required when Keystone V2 is being used
project_name = admin # This is required when Keystone V2 is being used
# NEVER Mix and match the options tenant name and domain_name options.
# NEVER Mix and match the options tenant name and domain_name options withiin the same section.
# You are be required to run either V2 or V3 as it pertains to this config.
# If you provide both tenant_name and .*domain_name options at the same time
# the plugins will fail API version negotiation.
username = admin
password = Secrete
# The verify option is for SSL. If your SSL certificate is not
# valid set this option to false else omit it or set it true.
verify = false
[keystone]
[glance]
[nova]
project_name = nova
@ -40,7 +40,7 @@ project_name = nova
[cinder]
[ironic]
auth_url = https://example2.com:5000/v3
auth_url = https://127.0.1.1:5000/v3
project_name = ironic
user_domain_name = users
project_domain_name = projects

View File

@ -18,14 +18,15 @@ import socket
import click
from monitorstack import utils
from monitorstack.cli import pass_context
DOC = """Get metrics from a KVM hypervisor."""
COMMAND = 'kvm'
COMMAND_NAME = 'kvm'
@click.command(COMMAND, short_help=DOC.split('\n')[0])
@click.command(COMMAND_NAME, short_help=DOC.split('\n')[0])
@pass_context
def cli(ctx):
"""Get metrics from a KVM hypervisor."""
@ -60,7 +61,10 @@ def cli(ctx):
except Exception as exp:
output['exit_code'] = 1
output['message'] = 'kvm failed -- Error: {}'.format(exp)
output['message'] = '{} failed -- {}'.format(
COMMAND_NAME,
utils.log_exception(exp=exp)
)
else:
output['exit_code'] = 0
output['message'] = 'kvm is ok'

View File

@ -53,7 +53,10 @@ def cli(ctx, config_file):
variables[project.name] = int(limits['quota_set']['cores'])
except Exception as exp:
output['exit_code'] = 1
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
output['message'] = '{} failed -- {}'.format(
COMMAND_NAME,
utils.log_exception(exp=exp)
)
else:
output['exit_code'] = 0
output['message'] = '{} is ok'.format(COMMAND_NAME)

View File

@ -53,7 +53,10 @@ def cli(ctx, config_file):
variables[project.name] = int(limits['quota_set']['instances'])
except Exception as exp:
output['exit_code'] = 1
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
output['message'] = '{} failed -- {}'.format(
COMMAND_NAME,
utils.log_exception(exp=exp)
)
else:
output['exit_code'] = 0
output['message'] = '{} is ok'.format(COMMAND_NAME)

View File

@ -53,7 +53,10 @@ def cli(ctx, config_file):
variables[project.name] = int(limits['quota_set']['ram'])
except Exception as exp:
output['exit_code'] = 1
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
output['message'] = '{} failed -- {}'.format(
COMMAND_NAME,
utils.log_exception(exp=exp)
)
else:
output['exit_code'] = 0
output['message'] = '{} is ok'.format(COMMAND_NAME)

View File

@ -51,13 +51,19 @@ def cli(ctx, config_file):
variables = output['variables']
for used in _ost.get_consumer_usage():
flavor = flavors[used['flavor']['id']]
used_collection[used['name']] += int(flavor['vcpus'])
output['meta'][used['flavor']['id']] = True
output['meta'][used['flavor']['name']] = True
project_name = _ost.get_project_name(project_id=used['project_id'])
used_collection[project_name] += int(flavor['vcpus'])
flavor_id = used['flavor']['id']
output['meta'][flavor_id] = True
flavor_name = _ost.get_flavor_name(flavor_id=flavor_id)
output['meta'][flavor_name] = True
variables.update(used_collection)
except Exception as exp:
output['exit_code'] = 1
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
output['message'] = '{} failed -- {}'.format(
COMMAND_NAME,
utils.log_exception(exp=exp)
)
else:
output['exit_code'] = 0
output['message'] = '{} is ok'.format(COMMAND_NAME)

View File

@ -51,13 +51,19 @@ def cli(ctx, config_file):
variables = output['variables']
for used in _ost.get_consumer_usage():
flavor = flavors[used['flavor']['id']]
used_collection[used['name']] += int(flavor['disk'])
output['meta'][used['flavor']['id']] = True
output['meta'][used['flavor']['name']] = True
project_name = _ost.get_project_name(project_id=used['project_id'])
used_collection[project_name] += int(flavor['disk'])
flavor_id = used['flavor']['id']
output['meta'][flavor_id] = True
flavor_name = _ost.get_flavor_name(flavor_id=flavor_id)
output['meta'][flavor_name] = True
variables.update(used_collection)
except Exception as exp:
output['exit_code'] = 1
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
output['message'] = '{} failed -- {}'.format(
COMMAND_NAME,
utils.log_exception(exp=exp)
)
else:
output['exit_code'] = 0
output['message'] = '{} is ok'.format(COMMAND_NAME)

View File

@ -49,11 +49,19 @@ def cli(ctx, config_file):
try:
variables = output['variables']
for used in _ost.get_consumer_usage():
used_collection[used['name']] += 1
project_name = _ost.get_project_name(project_id=used['project_id'])
used_collection[project_name] += 1
flavor_id = used['flavor']['id']
output['meta'][flavor_id] = True
flavor_name = _ost.get_flavor_name(flavor_id=flavor_id)
output['meta'][flavor_name] = True
variables.update(used_collection)
except Exception as exp:
output['exit_code'] = 1
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
output['message'] = '{} failed -- {}'.format(
COMMAND_NAME,
utils.log_exception(exp=exp)
)
else:
output['exit_code'] = 0
output['message'] = '{} is ok'.format(COMMAND_NAME)

View File

@ -51,13 +51,19 @@ def cli(ctx, config_file):
variables = output['variables']
for used in _ost.get_consumer_usage():
flavor = flavors[used['flavor']['id']]
used_collection[used['name']] += int(flavor['ram'])
output['meta'][used['flavor']['id']] = True
output['meta'][used['flavor']['name']] = True
project_name = _ost.get_project_name(project_id=used['project_id'])
used_collection[project_name] += int(flavor['ram'])
flavor_id = used['flavor']['id']
output['meta'][flavor_id] = True
flavor_name = _ost.get_flavor_name(flavor_id=flavor_id)
output['meta'][flavor_name] = True
variables.update(used_collection)
except Exception as exp:
output['exit_code'] = 1
output['message'] = '{} failed -- Error: {}'.format(COMMAND_NAME, exp)
output['message'] = '{} failed -- {}'.format(
COMMAND_NAME,
utils.log_exception(exp=exp)
)
else:
output['exit_code'] = 0
output['message'] = '{} is ok'.format(COMMAND_NAME)

View File

@ -13,12 +13,12 @@
# limitations under the License.
"""Common code for utils."""
import functools
import os
import shelve
import sys
import tempfile
import time
import traceback
# Lower import to support conditional configuration parser
try:
if sys.version_info > (3, 2, 0):
import configparser as ConfigParser
@ -27,9 +27,50 @@ try:
except ImportError:
raise SystemExit('No configparser module was found.')
import diskcache
def retry(ExceptionToCheck, tries=3, delay=1, backoff=1): # noqa
"""Retry calling the decorated function using an exponential backoff.
Attributes to sources of inspiration:
http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
:param ExceptionToCheck: the exception to check. may be a tuple of
exceptions to check
:type ExceptionToCheck: Exception or tuple
:param tries: number of times to try (not retry) before giving up
:type tries: int
:param delay: initial delay between retries in seconds
:type delay: int
:param backoff: backoff multiplier e.g. value of 2 will double the delay
each retry
:type backoff: int
"""
def deco_retry(f):
@functools.wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
while mtries > 1:
try:
return f(*args, **kwargs)
except ExceptionToCheck:
time.sleep(mdelay)
mtries -= 1
mdelay *= backoff
return f(*args, **kwargs)
return f_retry # true decorator
return deco_retry
def is_int(value):
"""Check if a variable is an integer."""
"""Check if a variable is an integer.
:param value: parameter to evaluate and return
:type value: str || int || float
:returns: str || int || float
"""
for v_type in [int, float]:
try:
value = v_type(value)
@ -42,43 +83,68 @@ def is_int(value):
class LocalCache(object):
"""Context Manager for opening and closing access to the DBM."""
"""Context Manager for opening and closing access to the cache objects."""
def __init__(self):
"""Initialization method for class."""
"""Set the Path to the DBM to create/Open."""
def __init__(self, cache_path=None):
"""Set the Path cache object.
self.db_cache = os.path.join(
tempfile.gettempdir(),
'monitorstack.openstack.dbm'
)
:param cache_file: File path to store cache
:type cache_file: str
"""
# If a cache file is provided use it otherwise store one in
# the user home folder as a hidden folder.
self.cache_path = cache_path
if not self.cache_path:
self.cache_path = os.path.join(
os.path.expanduser('~'),
'.monitorstack.cache'
)
elif not self.cache_path.endswith('cache'):
self.cache_path = '{}.cache'.format(self.cache_path)
if not os.path.isdir(self.cache_path):
os.makedirs(self.cache_path)
def __enter__(self):
"""Open the DBM in r/w mode.
"""Open the cache object.
:return: Open DBM
:returns: object
"""
return self.open_shelve
return self.open_cache
def __exit__(self, type, value, traceback):
"""Close DBM Connection."""
self.close_shelve()
def _open_shelve(self):
return shelve.open(self.db_cache)
def __exit__(self, *args, **kwargs):
"""Close cache object."""
self.lc_close()
@property
def open_shelve(self):
"""Open shelved data."""
return self._open_shelve()
@retry(ExceptionToCheck=Exception)
def open_cache(self):
"""Return open caching opbject.
def close_shelve(self):
:returns: object
"""
return diskcache.Cache(directory=self.cache_path)
def lc_open(self):
"""Open shelved data.
:param cache_file: File path to store cache
:type cache_file: str
:returns: object
"""
return self.open_cache
def lc_close(self):
"""Close shelved data."""
self.open_shelve.close()
self.open_cache.close()
def read_config(config_file):
"""Read an OpenStack configuration."""
"""Read an OpenStack configuration.
:param config_file: path to configuration file.
:type config_file: str
"""
cfg = os.path.abspath(os.path.expanduser(config_file))
if not os.path.isfile(cfg):
raise IOError('Config file "{}" was not found'.format(cfg))
@ -89,11 +155,22 @@ def read_config(config_file):
args = dict()
defaults = dict([(k, v) for k, v in parser.items(section='DEFAULT')])
for section in parser.sections():
if section == 'DEFAULT':
continue
sec = args[section] = defaults
for key, value in parser.items(section):
sec[key] = is_int(value=value)
return args
def log_exception(exp):
"""Return log entries.
:param exp: Exception object or name.
:type exp: str || object
:return: str
"""
_trace = [i.strip() for i in str(traceback.format_exc()).splitlines()]
trace = ' -> '.join(_trace)
_exception = [i.strip() for i in str(exp).splitlines()]
exception = ' -> '.join(_exception)
return 'Exception [ %s ]: Trace: [ %s ]' % (exception, trace)

View File

@ -15,7 +15,6 @@
try:
from openstack import connection as os_conn
from openstack import exceptions as os_exp
except ImportError as e:
raise SystemExit('OpenStack plugins require access to the OpenStackSDK.'
' Please install "python-openstacksdk".'
@ -28,16 +27,32 @@ class OpenStack(object):
"""Class for reusable OpenStack utility methods."""
def __init__(self, os_auth_args):
"""Initialization method for class."""
"""Initialization method for class.
:param os_auth_args: dict containing auth creds.
:type os_auth_args: dict
"""
self.os_auth_args = os_auth_args
@property
def conn(self):
"""Return an OpenStackSDK connection."""
"""Return an OpenStackSDK connection.
:returns: object
"""
return os_conn.Connection(**self.os_auth_args)
def get_consumer_usage(self, servers=None, marker=None, limit=512):
"""Retrieve current usage by an OpenStack cloud consumer."""
"""Retrieve current usage by an OpenStack cloud consumer.
:param servers: ID of a given project to lookup.
:type servers: str || uuid
:param marker: ID of last server seen.
:type marker: str || uuid
:param limit: Number of items a single API call can return.
:type limit: int
:returns: list
"""
tenant_kwargs = {'details': True, 'all_tenants': True, 'limit': limit}
if not servers:
servers = list()
@ -47,54 +62,123 @@ class OpenStack(object):
count = 0
for server in self.conn.compute.servers(**tenant_kwargs):
servers.append(server)
servers.append(server.to_dict())
count += 1
if count == limit:
return self.get_consumer_usage(
servers=servers,
marker=servers[-1].id
)
if count == limit:
return self.get_consumer_usage(
servers=servers,
marker=servers[-1]['id']
)
return servers
def get_compute_limits(self, project_id, interface='internal'):
"""Determine limits of compute resources."""
url = self.conn.compute.session.get_endpoint(
"""Return compute resource limits for a project.
:param project_id: ID of a given project to lookup.
:type project_id: str || uuid
:param interface: Interface name, normally [internal, public, admin].
:type interface: str
:returns: dict
"""
url = self.conn.session.get_endpoint(
interface=interface,
service_type='compute'
)
quota_data = self.conn.compute.session.get(
quota_data = self.conn.session.get(
url + '/os-quota-sets/' + project_id
)
return quota_data.json()
def get_project_name(self, project_id):
"""Retrieve the name of a project."""
with utils.LocalCache() as c:
try:
project_name = c.get(project_id)
if not project_name:
project_info = self.conn.identity.get_project(project_id)
project_name = c[project_info.id] = project_info.name
except os_exp.ResourceNotFound:
return None
else:
return project_name
def get_projects(self):
"""Retrieve a list of projects."""
"""Retrieve a list of projects.
:returns: list
"""
_consumers = list()
with utils.LocalCache() as c:
for project in self.conn.identity.projects():
_consumers.append(project)
c[project.id] = project.name
cache_key = 'projects_' + str(project.id)
c.set(
cache_key,
project.to_dict(),
expire=43200,
tag='projects'
)
return _consumers
def get_project(self, project_id):
"""Retrieve project data.
:param project_id: ID of a given project to lookup.
:type project_id: str || uuid
:returns: dict
"""
project = None
cache_key = 'projects_{}'.format(project_id)
with utils.LocalCache() as c:
try:
project = c.get(cache_key)
if not project:
raise LookupError
except LookupError:
project_info = self.conn.identity.get_project(project_id)
project = project_info.to_dict()
c.set(cache_key, project, expire=43200, tag='projects')
finally:
return project
def get_project_name(self, project_id):
"""Retrieve the name of a project."""
return self.get_project(project_id=project_id)['name']
def get_flavors(self):
"""Retrieve a list of flavors."""
flavor_cache = dict()
for flavor in self.conn.compute.flavors():
entry = flavor_cache[flavor['id']] = dict()
entry.update(flavor)
return flavor_cache
"""Retrieve all of flavors.
:returns: dict
"""
flavors = dict()
with utils.LocalCache() as c:
for flavor in self.conn.compute.flavors():
_flavor = flavor.to_dict()
cache_key = 'flavor_' + str(flavor.id)
c.set(
cache_key,
_flavor,
expire=43200,
tag='flavors'
)
entry = flavors[flavor.id] = dict()
entry.update(_flavor)
return flavors
def get_flavor(self, flavor_id):
"""Retrieve a flavor.
:param flavor_id: ID of a given flavor to lookup.
:type flavor_id: int || str
:returns: dict
"""
flavor = None
cache_key = 'flavor_{}'.format(flavor_id)
with utils.LocalCache() as c:
try:
flavor = c.get(cache_key)
if not flavor:
raise LookupError
except LookupError:
flavor_info = self.conn.compute.get_flavor(flavor_id)
flavor = flavor_info.to_dict()
c.set(cache_key, flavor, expire=43200, tag='flavors')
finally:
return flavor
def get_flavor_name(self, flavor_id):
"""Retrieve the name of a flavor.
:param flavor_id: ID of a given flavor to lookup.
:type flavor_id: int || str
:returns: str
"""
return self.get_flavor(flavor_id=flavor_id)['name']

View File

@ -1,4 +1,5 @@
click
diskcache
openstacksdk>=0.9.14
psutil>=5.2.0
six

View File

@ -13,3 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""This an __init__.py."""
import os
from monitorstack import utils
def read_config():
"""Load the test config file."""
os_config_file = os.path.expanduser(
os.path.abspath(
os.path.dirname(__file__) + '/files/test-openstack.ini'
)
)
return utils.read_config(os_config_file)

View File

@ -1,47 +0,0 @@
# Store the authentication credentials needed to query a given OpenStack Service.
# All sections are overrides for the defaults. If you only need to connect to a
# single cloud simply store the credentials needd in the DEFAULT section and
# override whatever is needed within the local sections.
[DEFAULT]
insecure = false
auth_url = https://localhost:5000/v3
# NOTE(cloudnull):
# When using keystone V3 you will need the .*domain_name configuration options.
user_domain_name = default # This is required when Keystone V3 is being used
project_domain_name = default # This is required when Keystone V3 is being used
# If you're using keystone V2 you will need the tenant_name option.
tenant_name = admin # This is required when Keystone V2 is being used
# NEVER Mix and match the options tenant name and domain_name options.
# You are be required to run either V2 or V3 as it pertains to this config.
# If you provide both tenant_name and .*domain_name options at the same time
# the plugins will fail API version negotiation.
username = admin
password = Secrete
# The verify option is for SSL. If your SSL certificate is not
# valid set this option to false else omit it or set it true.
verify = false
[keystone]
[glance]
[nova]
project_name = nova
[neutron]
[heat]
[cinder]
[ironic]
auth_url = https://localhost:5000/v3
project_name = ironic
user_domain_name = users
project_domain_name = projects
password = SuperSecrete

View File

@ -0,0 +1 @@
../../etc/openstack.ini

246
tests/test_os_utils.py Normal file
View File

@ -0,0 +1,246 @@
# Copyright 2017, Major Hayden <major@mhtx.net>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the os_utils plugin."""
import unittest
import mock
from monitorstack.utils import os_utils
import tests # Import the test base module
class OpenStackObject(object):
"""Mocked server object."""
def __init__(self, id=None, name=None):
"""Mocked server class."""
self.id = id
self.name = name
def to_dict(self):
"""Mocked dict return."""
return {
'id': self.id,
'name': self.name
}
class MockedOpenStackConn(object):
"""Mocked OpenStack Connection object."""
class compute(object): # noqa
"""Mocked compute class."""
@staticmethod
def servers(*args, **kwargs):
"""Mocked servers method."""
servers = [
OpenStackObject(1, 'test1'),
OpenStackObject(2, 'test2'),
OpenStackObject(3, 'test3'),
OpenStackObject(4, 'test4'),
OpenStackObject(5, 'test5')
]
if 'marker' in kwargs:
for server in servers:
if server.id == kwargs['marker']:
index = servers.index(server)
servers.pop(index)
return servers[index:]
return servers
@staticmethod
def flavors():
"""Mocked flavors return."""
return [
OpenStackObject(1, 'test1'),
OpenStackObject(2, 'test2'),
OpenStackObject(3, 'test3'),
OpenStackObject(4, 'test4'),
OpenStackObject(5, 'test5')
]
@staticmethod
def get_flavor(flavor_id):
"""Return mocked flavor object."""
return OpenStackObject(
flavor_id,
'test_{}'.format(flavor_id)
)
class identity(object): # noqa
"""Mocked identity object."""
@staticmethod
def projects():
"""Mocked projects return."""
return [
OpenStackObject(1, 'test1'),
OpenStackObject(2, 'test2'),
OpenStackObject(3, 'test3'),
OpenStackObject(4, 'test4'),
OpenStackObject(5, 'test5')
]
@staticmethod
def get_project(project_id):
"""Return mocked project object."""
return OpenStackObject(
project_id,
'test_{}'.format(project_id)
)
class session(object): # noqa
"""Mocked session object."""
@staticmethod
def get_endpoint(interface, service_type):
"""Mocked endpoint return."""
return "https://127.0.1.1/{}/{}".format(interface, service_type)
@staticmethod
def get(url):
"""Mocked get return."""
class SessionGet(object):
"""Mocked session object."""
def __init__(self, url):
"""Mocked session get."""
self.url = url
def json(self):
"""Mocked json return."""
return {'url': self.url}
return SessionGet(url=url)
class TestOSUtilsConnection(unittest.TestCase):
"""Tests for the utilities."""
def test_conn(self):
"""Test the OpenStack connection interface."""
# load the base class for these tests.
self.osu = os_utils.OpenStack(
os_auth_args=tests.read_config()['keystone']
)
self.assertTrue(
isinstance(
self.osu.conn,
os_utils.os_conn.Connection
)
)
class TestOsUtils(unittest.TestCase):
"""Tests for the utilities."""
def setUp(self):
"""Setup the test."""
# load the base class for these tests.
self.osu = os_utils.OpenStack(
os_auth_args=tests.read_config()['keystone']
)
def tearDown(self):
"""Tear down the test."""
pass
def test_get_consumer_usage(self):
"""Test retrieving consumer usage."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
self.assertTrue(isinstance(self.osu.get_consumer_usage(), list))
def test_get_consumer_usage_with_servers(self):
"""Test retrieving consumer usage with servers list."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
servers = self.osu.get_consumer_usage(
servers=[OpenStackObject(0, 'test0').to_dict()]
)
self.assertEquals(len(servers), 6)
def test_get_consumer_usage_with_marker(self):
"""Test retrieving consumer usage."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
servers = self.osu.get_consumer_usage(marker=5)
self.assertEquals(len(servers), 0)
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
servers = self.osu.get_consumer_usage(marker=2)
self.assertEquals(len(servers), 3)
def test_get_consumer_usage_with_limit(self):
"""Test retrieving consumer usage."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
servers = self.osu.get_consumer_usage(limit=1)
self.assertEquals(len(servers), 5)
def test_get_compute_limits(self):
"""Test retrieving consumer limits."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
limits = self.osu.get_compute_limits(project_id='not-a-uuid')
u = 'https://127.0.1.1/internal/compute/os-quota-sets/not-a-uuid'
self.assertEquals(limits, {'url': u})
def test_get_projects(self):
"""Test retrieving project list."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
projects = self.osu.get_projects()
self.assertEquals(len(projects), 5)
def test_get_project(self):
"""Test retrieving project dict."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
project = self.osu.get_project(project_id='12345')
self.assertEquals(project['id'], '12345')
self.assertEquals(project['name'], 'test_12345')
def test_get_project_name(self):
"""Test retrieving project name."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
project_name = self.osu.get_project_name(project_id='12345')
self.assertEquals(project_name, 'test_12345')
def test_get_flavors(self):
"""Test retrieving flavors dict."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
servers = self.osu.get_flavors()
self.assertEquals(len(servers), 5)
def test_get_flavor(self):
"""Test retrieving flavor dict."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
flavor = self.osu.get_flavor(flavor_id=12345)
self.assertEquals(flavor['id'], 12345)
self.assertEquals(flavor['name'], 'test_12345')
def test_get_flavor_name(self):
"""Test retrieving flavor name."""
with mock.patch('openstack.connection.Connection') as MockClass:
MockClass.return_value = MockedOpenStackConn()
flavor_name = self.osu.get_flavor_name(flavor_id=12345)
self.assertEquals(flavor_name, 'test_12345')

View File

@ -15,12 +15,22 @@
import json
import sys
import unittest
from click.testing import CliRunner
from monitorstack.cli import cli
def _runner(module):
runner = CliRunner()
result = runner.invoke(cli, ['-f', 'json', module])
try:
return json.loads(result.output)
except Exception:
return result.exception
class LibvirtStub(object):
"""Stubbed libvirt class."""
@ -38,26 +48,44 @@ class LibvirtStub(object):
class lookupByID(object): # noqa
"""Stubbed lookupByID class."""
def __init__(self, *args, **kwargs): # noqa
def __init__(self, *args, **kwargs): # noqa
pass
def maxVcpus(self): # noqa
return 2
class TestKvm(object):
class LibvirtStubFailed(object):
"""Stubbed libvirt class."""
class openReadOnly(object): # noqa
"""Stubbed openReadOnly class."""
def close(self, *args, **kwargs): # noqa
pass
def listDomainsID(self, *args, **kwargs): # noqa
raise RuntimeError('Failed')
class TestKvm(unittest.TestCase):
"""Tests for the kvm monitor."""
def test_run(self):
def setUp(self):
"""Setup teardown."""
self.orig_libvirt = sys.modules.pop('libvirt', None)
sys.modules['libvirt'] = LibvirtStub()
def tearDown(self):
"""Teardown method."""
if self.orig_libvirt:
sys.modules['libvirt'] = self.orig_libvirt
def test_run_success(self):
"""Ensure the run() method works."""
sys.modules['libvirt'] = LibvirtStub
runner = CliRunner()
result = runner.invoke(cli, ['-f', 'json', 'kvm'])
result_json = json.loads(result.output)
variables = result_json['variables']
meta = result_json['meta']
result = _runner('kvm')
variables = result['variables']
meta = result['meta']
assert 'kvm_vms' in variables
assert variables['kvm_vms'] == 3
assert 'kvm_total_vcpus' in variables
@ -66,4 +94,17 @@ class TestKvm(object):
assert variables['kvm_scheduled_vcpus'] == 6
assert 'platform' in meta
assert 'kvm_host_id' in meta
assert result.exit_code == 0
assert result['exit_code'] == 0
def test_run_failure_no_libvirt(self):
"""Ensure the run() method works."""
sys.modules.pop('libvirt', None)
result = _runner('kvm')
self.assertTrue(isinstance(result, SystemExit))
def test_run_failure(self):
"""Ensure the run() method works."""
sys.modules['libvirt'] = LibvirtStubFailed()
result = _runner('kvm')
assert result['measurement_name'] == 'kvm'
assert result['exit_code'] == 1

View File

@ -38,20 +38,22 @@ class MockProject(object):
"""Mock init."""
self.id = 'testing'
self.name = 'testing'
self.project_id = 12345
def mock_get_consumer_usage(self):
def mock_get_consumer_usage(*args, **kwargs):
"""Mocked get_consumer_usage()."""
return [{
'name': 'test_name',
'project_id': 12345,
'flavor': {
'id': 1,
'name': 'flavor_one',
'name': 'flavor_one'
}
}]
def mock_get_flavors(self):
def mock_get_flavors(*args, **kwargs):
"""Mocked get_flavors()."""
return {
1: {
@ -63,13 +65,27 @@ def mock_get_flavors(self):
}
def mock_get_projects(arg1):
def mock_get_flavor(*args, **kwargs):
"""Mocked get_flavor(id)."""
return {
'name': 'flavor_one',
'vcpus': 2,
'disk': 10,
'ram': 1024,
}
def mock_get_project_name(*args, **kwargs):
"""Mocked get_projects()."""
projects = MockProject()
return [projects]
return 'test_name'
def mock_get_compute_limits(self, project_id, interface):
def mock_get_projects(*args, **kwargs):
"""Mocked get_projects()."""
return [MockProject()]
def mock_get_compute_limits(*args, **kwargs):
"""Mocked get_compute_limits()."""
return {
'quota_set': {
@ -131,6 +147,8 @@ class TestOs(object):
def test_os_vm_used_cores_success(self, monkeypatch):
"""Ensure os_vm_used_cores method works with success."""
monkeypatch.setattr(Ost, 'get_flavors', mock_get_flavors)
monkeypatch.setattr(Ost, 'get_flavor', mock_get_flavor)
monkeypatch.setattr(Ost, 'get_project_name', mock_get_project_name)
monkeypatch.setattr(Ost, 'get_consumer_usage', mock_get_consumer_usage)
result = _runner('os_vm_used_cores')
@ -147,6 +165,8 @@ class TestOs(object):
def test_os_vm_used_disk_success(self, monkeypatch):
"""Ensure os_vm_used_disk method works with success."""
monkeypatch.setattr(Ost, 'get_flavors', mock_get_flavors)
monkeypatch.setattr(Ost, 'get_flavor', mock_get_flavor)
monkeypatch.setattr(Ost, 'get_project_name', mock_get_project_name)
monkeypatch.setattr(Ost, 'get_consumer_usage', mock_get_consumer_usage)
result = _runner('os_vm_used_disk')
@ -162,6 +182,9 @@ class TestOs(object):
def test_os_vm_used_instance_success(self, monkeypatch):
"""Ensure os_vm_used_instance method works with success."""
monkeypatch.setattr(Ost, 'get_flavors', mock_get_flavors)
monkeypatch.setattr(Ost, 'get_flavor', mock_get_flavor)
monkeypatch.setattr(Ost, 'get_project_name', mock_get_project_name)
monkeypatch.setattr(Ost, 'get_consumer_usage', mock_get_consumer_usage)
result = _runner('os_vm_used_instance')
@ -178,6 +201,8 @@ class TestOs(object):
def test_os_vm_used_ram_success(self, monkeypatch):
"""Ensure os_vm_used_ram method works with success."""
monkeypatch.setattr(Ost, 'get_flavors', mock_get_flavors)
monkeypatch.setattr(Ost, 'get_flavor', mock_get_flavor)
monkeypatch.setattr(Ost, 'get_project_name', mock_get_project_name)
monkeypatch.setattr(Ost, 'get_consumer_usage', mock_get_consumer_usage)
result = _runner('os_vm_used_ram')

View File

@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the uptime plugin."""
"""Tests for the utils."""
import os
import tempfile
@ -26,21 +26,27 @@ class TestUtils(unittest.TestCase):
def setUp(self):
"""Initial setup for class."""
os_config_file = os.path.expanduser(
os.path.abspath(__file__ + '/../../etc/openstack.ini')
os.path.abspath(
os.path.dirname(__file__) + '/files/test-openstack.ini'
)
)
self.config = utils.read_config(os_config_file)
conf = utils.ConfigParser.RawConfigParser()
conf.read([os_config_file])
self.config_defaults = conf.defaults()
self.g_testfile = os.path.join(
os.path.expanduser('~'),
'.monitorstack.cache'
)
self.t_testfile = tempfile.mkdtemp()
def tearDown(self):
"""Destroy the local cache."""
local_cache = os.path.join(
tempfile.gettempdir(),
'monitorstack.openstack.dbm'
)
if os.path.exists(local_cache):
os.remove(local_cache)
for f in [self.g_testfile, self.t_testfile]:
cache_db = os.path.join(f, 'cache.db')
if os.path.exists(cache_db):
os.remove(cache_db)
def test_is_int_is_int(self): # noqa
self.assertTrue(isinstance(utils.is_int(value=1), int))
@ -67,8 +73,84 @@ class TestUtils(unittest.TestCase):
for key in self.config_defaults.keys():
self.assertTrue(key in v.keys())
def test_local_cache(self):
def test_local_cache_no_file(self):
"""Test local cache."""
with utils.LocalCache() as c:
c['test_key'] = True
self.assertTrue('test_key' in c)
c['test_key1'] = True
self.assertTrue('test_key1' in c)
def test_local_cache_file(self):
"""Test local cache."""
with utils.LocalCache(cache_path=self.t_testfile) as c:
c['test_key2'] = True
self.assertTrue('test_key2' in c)
def test_local_cache_no_file_no_context(self):
"""Test local cache without a context manager."""
c = utils.LocalCache()
cache = c.lc_open()
cache['test_key3'] = True
try:
self.assertTrue('test_key3' in cache)
finally:
c.lc_close()
with utils.LocalCache() as c:
self.assertTrue('test_key3' in c)
def test_local_cache_file_no_context(self):
"""Test local cache without a context manager."""
c = utils.LocalCache(cache_path=self.t_testfile)
cache = c.lc_open()
cache['test_key4'] = True
try:
self.assertTrue('test_key4' in cache)
finally:
c.lc_close()
with utils.LocalCache(cache_path=self.t_testfile) as c:
self.assertTrue('test_key4' in c)
def test_local_cache_no_load(self):
"""Test local cache without loading anything."""
c = utils.LocalCache(cache_path=self.t_testfile)
c.lc_close()
def test_local_cache_named_ext(self):
"""Test local cache without loading anything with a named extension."""
utils.LocalCache(cache_path='{}.cache'.format(self.t_testfile))
def test_retry_failure(self):
"""Test retry decorator for failure."""
@utils.retry(ExceptionToCheck=BaseException, tries=3, backoff=0,
delay=1)
def _failed():
"""Raise failure exception after retry."""
raise BaseException
self.assertRaises(BaseException, _failed)
def test_retry_success(self):
"""Test retry decorator for success."""
@utils.retry(ExceptionToCheck=BaseException, tries=3, backoff=0,
delay=1)
def _success():
"""Return True after retry."""
self.count += 1
if self.count == 3:
return True
else:
raise BaseException
self.count = 0
self.assertEquals(_success(), True)
def test_log_exception(self):
"""Test traceback formatter for exception messages."""
try:
raise Exception('test-exception')
except Exception as exp:
message = utils.log_exception(exp=exp)
self.assertTrue('Exception' in message)
self.assertTrue('Trace' in message)