Merge pull request #5 from ajkavanagh/feature/remove-python-code

Feature/remove python code
This commit is contained in:
gnuoy 2016-05-25 16:29:17 +01:00
commit 36230b4d7d
17 changed files with 10 additions and 1155 deletions

View File

@ -1,8 +0,0 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ ./unit_tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -1,11 +0,0 @@
#!/usr/bin/make
PYTHON := /usr/bin/env python
clean:
@rm -rf .testrepository .unit-state.db .tox
lint:
@tox -e pep8
test:
@echo Starting unit tests...
@tox -e py27

7
README.md Normal file
View File

@ -0,0 +1,7 @@
# Where is the code?
The code for this module is now held in the charm.openstack module which is
available at the following locations:
- TODO PyPi
- TODO github.com/openstack-charms/charm.openstack

View File

@ -1,7 +1,3 @@
includes: ['layer:basic']
ignore:
- 'unit_tests'
- 'Makefile'
- '.testr.conf'
- 'test-requirements.txt'
- 'tox.ini'
- 'README.md'

View File

View File

@ -1,199 +0,0 @@
"""Adapter classes and utilities for use with Reactive interfaces"""
from charmhelpers.core import hookenv
class OpenStackRelationAdapter(object):
"""
Base adapter class for all OpenStack related adapters.
"""
interface_type = None
"""
The generic type of the interface the adapter is wrapping.
"""
def __init__(self, relation, accessors=None):
self.relation = relation
self.accessors = accessors or []
self._setup_properties()
@property
def relation_name(self):
"""
Name of the relation this adapter is handling.
"""
return self.relation.relation_name
def _setup_properties(self):
"""
Setup property based accessors for an interfaces
auto accessors
Note that the accessor is dynamic as each access calls the underlying
getattr() for each property access.
"""
self.accessors.extend(self.relation.auto_accessors)
for field in self.accessors:
meth_name = field.replace('-', '_')
# Get the relation property dynamically
# Note the additional lambda name: is to create a closure over
# meth_name so that a new 'name' gets created for each loop,
# otherwise the same variable meth_name is referenced in each of
# the internal lambdas. i.e. this is (lambda x: ...)(value)
setattr(self.__class__,
meth_name,
(lambda name: property(
lambda self: getattr(
self.relation, name)()))(meth_name))
class RabbitMQRelationAdapter(OpenStackRelationAdapter):
"""
Adapter for the RabbitMQRequires relation interface.
"""
interface_type = "messaging"
def __init__(self, relation):
add_accessors = ['vhost', 'username']
super(RabbitMQRelationAdapter, self).__init__(relation, add_accessors)
@property
def host(self):
"""
Hostname that should be used to access RabbitMQ.
"""
if self.vip:
return self.vip
else:
return self.private_address
@property
def hosts(self):
"""
Comma separated list of hosts that should be used
to access RabbitMQ.
"""
hosts = self.relation.rabbitmq_hosts()
if len(hosts) > 1:
return ','.join(hosts)
else:
return None
class DatabaseRelationAdapter(OpenStackRelationAdapter):
"""
Adapter for the Database relation interface.
"""
interface_type = "database"
def __init__(self, relation):
add_accessors = ['password', 'username', 'database']
super(DatabaseRelationAdapter, self).__init__(relation, add_accessors)
@property
def host(self):
"""
Hostname that should be used to access RabbitMQ.
"""
return self.relation.db_host()
@property
def type(self):
return 'mysql'
def get_uri(self, prefix=None):
if prefix:
uri = 'mysql://{}:{}@{}/{}'.format(
self.relation.username(prefix=prefix),
self.relation.password(prefix=prefix),
self.host,
self.relation.database(prefix=prefix),
)
else:
uri = 'mysql://{}:{}@{}/{}'.format(
self.username,
self.password,
self.host,
self.database,
)
try:
if self.ssl_ca:
uri = '{}?ssl_ca={}'.format(uri, self.ssl_ca)
if self.ssl_cert:
uri = ('{}&ssl_cert={}&ssl_key={}'
.format(uri, self.ssl_cert, self.ssl_key))
except AttributeError:
# ignore ssl_ca or ssl_cert if not available
pass
return uri
@property
def uri(self):
return self.get_uri()
class ConfigurationAdapter(object):
"""
Configuration Adapter which provides python based access
to all configuration options for the current charm.
"""
def __init__(self):
_config = hookenv.config()
for k, v in _config.items():
k = k.replace('-', '_')
setattr(self, k, v)
class OpenStackRelationAdapters(object):
"""
Base adapters class for OpenStack Charms, used to aggregate
the relations associated with a particular charm so that their
properties can be accessed using dot notation, e.g:
adapters.amqp.private_address
"""
relation_adapters = {}
"""
Dictionary mapping relation names to adapter classes, e.g:
relation_adapters = {
'amqp': RabbitMQRelationAdapter,
}
By default, relations will be wrapped in an OpenStackRelationAdapter.
"""
_adapters = {
'amqp': RabbitMQRelationAdapter,
'shared_db': DatabaseRelationAdapter,
}
"""
Default adapter mappings; may be overridden by relation adapters
in subclasses.
"""
def __init__(self, relations, options=ConfigurationAdapter):
self._adapters.update(self.relation_adapters)
self._relations = []
for relation in relations:
relation_name = relation.relation_name.replace('-', '_')
try:
relation_value = self._adapters[relation_name](relation)
except KeyError:
relation_value = OpenStackRelationAdapter(relation)
setattr(self, relation_name, relation_value)
self._relations.append(relation_name)
self.options = options()
self._relations.append('options')
def __iter__(self):
"""
Iterate over the relations presented to the charm.
"""
for relation in self._relations:
yield relation, getattr(self, relation)

View File

@ -1,176 +0,0 @@
"""Classes to support writing re-usable charms in the reactive framework"""
from __future__ import absolute_import
import subprocess
import os
from contextlib import contextmanager
from collections import OrderedDict
from charmhelpers.contrib.openstack.utils import (
configure_installation_source,
)
from charmhelpers.core.host import path_hash, service_restart
from charmhelpers.core.hookenv import config, status_set
from charmhelpers.fetch import (
apt_install,
apt_update,
filter_installed_packages,
)
from charmhelpers.contrib.openstack.templating import get_loader
from charmhelpers.core.templating import render
from charmhelpers.core.hookenv import leader_get, leader_set
from charms.reactive.bus import set_state, remove_state
from charm.openstack.ip import PUBLIC, INTERNAL, ADMIN, canonical_url
class OpenStackCharm(object):
"""
Base class for all OpenStack Charm classes;
encapulates general OpenStack charm payload operations
"""
name = 'charmname'
packages = []
"""Packages to install"""
api_ports = {}
"""
Dictionary mapping services to ports for public, admin and
internal endpoints
"""
service_type = None
"""Keystone endpoint type"""
default_service = None
"""Default service for the charm"""
restart_map = {}
sync_cmd = []
services = []
adapters_class = None
def __init__(self, interfaces=None):
self.config = config()
# XXX It's not always liberty!
self.release = 'liberty'
if interfaces and self.adapters_class:
self.adapter_instance = self.adapters_class(interfaces)
def install(self):
"""
Install packages related to this charm based on
contents of packages attribute.
"""
packages = filter_installed_packages(self.packages)
if packages:
status_set('maintenance', 'Installing packages')
apt_install(packages, fatal=True)
self.set_state('{}-installed'.format(self.name))
def set_state(self, state, value=None):
set_state(state, value)
def remove_state(self, state):
remove_state(state)
def api_port(self, service, endpoint_type=PUBLIC):
"""
Determine the API port for a particular endpoint type
"""
return self.api_ports[service][endpoint_type]
def configure_source(self):
"""Configure installation source"""
configure_installation_source(self.config['openstack-origin'])
apt_update(fatal=True)
@property
def region(self):
"""OpenStack Region"""
return self.config['region']
@property
def public_url(self):
"""Public Endpoint URL"""
return "{}:{}".format(canonical_url(PUBLIC),
self.api_port(self.default_service,
PUBLIC))
@property
def admin_url(self):
"""Admin Endpoint URL"""
return "{}:{}".format(canonical_url(ADMIN),
self.api_port(self.default_service,
ADMIN))
@property
def internal_url(self):
"""Internal Endpoint URL"""
return "{}:{}".format(canonical_url(INTERNAL),
self.api_port(self.default_service,
INTERNAL))
@contextmanager
def restart_on_change(self):
checksums = {path: path_hash(path) for path in self.restart_map.keys()}
yield
restarts = []
for path in self.restart_map:
if path_hash(path) != checksums[path]:
restarts += self.restart_map[path]
services_list = list(OrderedDict.fromkeys(restarts).keys())
for service_name in services_list:
service_restart(service_name)
def render_all_configs(self):
self.render_configs(self.restart_map.keys())
def render_configs(self, configs):
with self.restart_on_change():
for conf in configs:
render(source=os.path.basename(conf),
template_loader=get_loader('templates/', self.release),
target=conf,
context=self.adapter_instance)
def restart_all(self):
for svc in self.services:
service_restart(svc)
def db_sync(self):
sync_done = leader_get(attribute='db-sync-done')
if not sync_done:
subprocess.check_call(self.sync_cmd)
leader_set({'db-sync-done': True})
# Restart services immediatly after db sync as
# render_domain_config needs a working system
self.restart_all()
class OpenStackCharmFactory(object):
releases = {}
"""
Dictionary mapping OpenStack releases to their associated
Charm class for this charm
"""
first_release = "icehouse"
"""
First OpenStack release which this factory supports Charms for
"""
@classmethod
def charm(cls, release=None, interfaces=None):
"""
Get an instance of the right charm for the
configured OpenStack series
"""
if release and release in cls.releases:
return cls.releases[release](interfaces=interfaces)
else:
return cls.releases[cls.first_release](interfaces=interfaces)

View File

@ -1,77 +0,0 @@
from charmhelpers.core.hookenv import (
config,
unit_get,
)
from charmhelpers.contrib.network.ip import (
get_address_in_network,
is_address_in_network,
is_ipv6,
get_ipv6_addr,
)
from charmhelpers.contrib.hahelpers.cluster import is_clustered
PUBLIC = 'public'
INTERNAL = 'int'
ADMIN = 'admin'
_ADDRESS_MAP = {
PUBLIC: {
'config': 'os-public-network',
'fallback': 'public-address'
},
INTERNAL: {
'config': 'os-internal-network',
'fallback': 'private-address'
},
ADMIN: {
'config': 'os-admin-network',
'fallback': 'private-address'
}
}
def canonical_url(endpoint_type=PUBLIC):
'''
Returns the correct HTTP URL to this host given the state of HTTPS
configuration, hacluster and charm configuration.
:endpoint_type str: The endpoint type to resolve.
:returns str: Base URL for services on the current service unit.
'''
scheme = 'http'
# if 'https' in configs.complete_contexts():
# scheme = 'https'
address = resolve_address(endpoint_type)
if is_ipv6(address):
address = "[{}]".format(address)
return "{0}://{1}".format(scheme, address)
def resolve_address(endpoint_type=PUBLIC):
resolved_address = None
if is_clustered():
if config(_ADDRESS_MAP[endpoint_type]['config']) is None:
# Assume vip is simple and pass back directly
resolved_address = config('vip')
else:
for vip in config('vip').split():
if is_address_in_network(
config(_ADDRESS_MAP[endpoint_type]['config']),
vip):
resolved_address = vip
else:
if config('prefer-ipv6'):
fallback_addr = get_ipv6_addr(exc_list=[config('vip')])[0]
else:
fallback_addr = unit_get(_ADDRESS_MAP[endpoint_type]['fallback'])
resolved_address = get_address_in_network(
config(_ADDRESS_MAP[endpoint_type]['config']), fallback_addr)
if resolved_address is None:
raise ValueError('Unable to resolve a suitable IP address'
' based on charm state and configuration')
else:
return resolved_address

View File

@ -1,7 +0,0 @@
flake8>=2.2.4,<=2.4.1
os-testr>=0.4.1
paramiko<2.0
charm-tools>=2.0.0
charms.reactive
mock>=1.2
coverage>=3.6

25
tox.ini
View File

@ -1,25 +0,0 @@
[tox]
envlist = pep8,py27
skipsdist = True
[testenv]
setenv = VIRTUAL_ENV={envdir}
PYTHONHASHSEED=0
install_command =
pip install --allow-unverified python-apt {opts} {packages}
commands = ostestr {posargs}
[testenv:py27]
basepython = python2.7
deps = -r{toxinidir}/test-requirements.txt
[testenv:pep8]
basepython = python2.7
deps = -r{toxinidir}/test-requirements.txt
commands = flake8 {posargs} lib unit_tests
[testenv:venv]
commands = {posargs}
[flake8]
ignore = E402,E226

View File

@ -1,8 +0,0 @@
import sys
import mock
sys.path.append('./lib')
# mock out some charmhelpers libraries as they have apt install side effects
sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()

View File

@ -1,198 +0,0 @@
# Note that the unit_tests/__init__.py has the following lines to stop
# side effects from the imorts from charm helpers.
# sys.path.append('./lib')
# mock out some charmhelpers libraries as they have apt install side effects
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
import unittest
import mock
import charm.openstack.adapters as adapters
class MyRelation(object):
auto_accessors = ['this', 'that']
relation_name = 'my-name'
def this(self):
return 'this'
def that(self):
return 'that'
def some(self):
return 'thing'
class TestOpenStackRelationAdapter(unittest.TestCase):
def test_class(self):
ad = adapters.OpenStackRelationAdapter(MyRelation(), ['some'])
self.assertEqual(ad.this, 'this')
self.assertEqual(ad.that, 'that')
self.assertEqual(ad.some, 'thing')
self.assertEqual(ad.relation_name, 'my-name')
with self.assertRaises(AttributeError):
ad.relation_name = 'hello'
class FakeRabbitMQRelation():
auto_accessors = ['vip', 'private_address']
relation_name = 'amqp'
def __init__(self, vip=None):
self._vip = vip
def vip(self):
return self._vip
def private_address(self):
return 'private-address'
def rabbitmq_hosts(self):
return ['host1', 'host2']
def vhost(self):
return 'vhost'
def username(self):
return 'fakename'
class TestRabbitMQRelationAdapter(unittest.TestCase):
def test_class(self):
fake = FakeRabbitMQRelation(None)
mq = adapters.RabbitMQRelationAdapter(fake)
self.assertEqual(mq.vhost, 'vhost')
self.assertEqual(mq.username, 'fakename')
self.assertEqual(mq.host, 'private-address')
# TODO: can't do the following 2 lines as not dynamic accessors
# fake._vip = 'vip1'
# self.assertEqual(mq.host, 'vip1')
self.assertEqual(mq.hosts, 'host1,host2')
class FakeDatabaseRelation():
auto_accessors = []
relation_name = 'shared_db'
def db_host(self):
return 'host1'
def username(self, prefix=''):
return 'username1{}'.format(prefix)
def password(self, prefix=''):
return 'password1{}'.format(prefix)
def database(self, prefix=''):
return 'database1{}'.format(prefix)
class SSLDatabaseRelationAdapter(adapters.DatabaseRelationAdapter):
ssl_ca = 'my-ca'
ssl_cert = 'my-cert'
ssl_key = 'my-key'
class TestDatabaseRelationAdapter(unittest.TestCase):
def test_class(self):
fake = FakeDatabaseRelation()
db = adapters.DatabaseRelationAdapter(fake)
self.assertEqual(db.host, 'host1')
self.assertEqual(db.type, 'mysql')
self.assertEqual(db.password, 'password1')
self.assertEqual(db.username, 'username1')
self.assertEqual(db.database, 'database1')
self.assertEqual(db.uri, 'mysql://username1:password1@host1/database1')
self.assertEqual(db.get_uri('x'),
'mysql://username1x:password1x@host1/database1x')
# test the ssl feature of the base class
db = SSLDatabaseRelationAdapter(fake)
self.assertEqual(db.uri,
'mysql://username1:password1@host1/database1'
'?ssl_ca=my-ca'
'&ssl_cert=my-cert&ssl_key=my-key')
class TestConfigurationAdapter(unittest.TestCase):
def test_class(self):
test_config = {
'one': 1,
'two': 2,
'three': 3,
'that-one': 4
}
with mock.patch.object(adapters.hookenv, 'config',
new=lambda: test_config):
c = adapters.ConfigurationAdapter()
self.assertEqual(c.one, 1)
self.assertEqual(c.three, 3)
self.assertEqual(c.that_one, 4)
class TestOpenStackRelationAdapters(unittest.TestCase):
# test the OpenStackRelationAdapters() class, and then derive from it to
# test the additonal relation_adapters member on __init__
def test_class(self):
test_config = {
'one': 1,
'two': 2,
'three': 3,
'that-one': 4
}
with mock.patch.object(adapters.hookenv, 'config',
new=lambda: test_config):
amqp = FakeRabbitMQRelation()
shared_db = FakeDatabaseRelation()
mine = MyRelation()
a = adapters.OpenStackRelationAdapters([amqp, shared_db, mine])
self.assertEqual(a.amqp.private_address, 'private-address')
self.assertEqual(a.my_name.this, 'this')
items = list(a)
self.assertEqual(items[0][0], 'amqp')
self.assertEqual(items[1][0], 'shared_db')
self.assertEqual(items[2][0], 'my_name')
self.assertEqual(items[3][0], 'options')
class MyRelationAdapter(adapters.OpenStackRelationAdapter):
@property
def us(self):
return self.this + '-us'
class MyOpenStackRelationAdapters(adapters.OpenStackRelationAdapters):
relation_adapters = {
'my_name': MyRelationAdapter,
}
class TestCustomOpenStackRelationAdapters(unittest.TestCase):
def test_class(self):
test_config = {
'one': 1,
'two': 2,
'three': 3,
'that-one': 4
}
with mock.patch.object(adapters.hookenv, 'config',
new=lambda: test_config):
amqp = FakeRabbitMQRelation()
shared_db = FakeDatabaseRelation()
mine = MyRelation()
a = MyOpenStackRelationAdapters([amqp, shared_db, mine])
self.assertEqual(a.my_name.us, 'this-us')

View File

@ -1,285 +0,0 @@
# Note that the unit_tests/__init__.py has the following lines to stop
# side effects from the imorts from charm helpers.
# sys.path.append('./lib')
# mock out some charmhelpers libraries as they have apt install side effects
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
import mock
import utils
import charm.openstack.charm as chm
TEST_CONFIG = {'config': True}
class BaseOpenStackCharmTest(utils.BaseTestCase):
@classmethod
def setUpClass(cls):
cls.patched_config = mock.patch.object(chm, 'config')
cls.patched_config_started = cls.patched_config.start()
@classmethod
def tearDownClass(cls):
cls.patched_config.stop()
cls.patched_config_started = None
cls.patched_config = None
def setUp(self, target_cls, test_config):
super(BaseOpenStackCharmTest, self).setUp()
# set up the return value on the mock before instantiating the class to
# get the config into the class.config.
chm.config.return_value = test_config
self.target = target_cls()
def tearDown(self):
self.target = None
super(BaseOpenStackCharmTest, self).tearDown()
def patch_target(self, attr, return_value=None, name=None, new=None):
# uses BaseTestCase.patch_object() to patch targer.
self.patch_object(self.target, attr, return_value, name, new)
class TestOpenStackCharm(BaseOpenStackCharmTest):
# Note that this only tests the OpenStackCharm() class, which has not very
# useful defaults for testing. In order to test all the code without too
# many mocks, a separate test dervied charm class is used below.
def setUp(self):
super(TestOpenStackCharm, self).setUp(chm.OpenStackCharm, TEST_CONFIG)
def test__init__(self):
# Note cls.setUpClass() creates an OpenStackCharm() instance
self.assertEqual(chm.config(), TEST_CONFIG)
self.assertEqual(self.target.config, TEST_CONFIG)
self.assertEqual(self.target.release, 'liberty')
def test_install(self):
# only tests that the default set_state is called
self.patch_target('set_state')
self.patch_object(chm, 'filter_installed_packages',
name='fip',
return_value=None)
self.target.install()
self.target.set_state.assert_called_once_with('charmname-installed')
self.fip.assert_called_once_with([])
def test_set_state(self):
# tests that OpenStackCharm.set_state() calls set_state() global
self.patch_object(chm, 'set_state')
self.target.set_state('hello')
self.set_state.assert_called_once_with('hello', None)
self.set_state.reset_mock()
self.target.set_state('hello', 'there')
self.set_state.assert_called_once_with('hello', 'there')
def test_remove_state(self):
# tests that OpenStackCharm.remove_state() calls remove_state() global
self.patch_object(chm, 'remove_state')
self.target.remove_state('hello')
self.remove_state.assert_called_once_with('hello')
def test_configure_source(self):
self.patch_object(chm, 'configure_installation_source', name='cis')
self.patch_object(chm, 'apt_update')
self.patch_target('config', new={'openstack-origin': 'an-origin'})
self.target.configure_source()
self.cis.assert_called_once_with('an-origin')
self.apt_update.assert_called_once_with(fatal=True)
def test_region(self):
self.patch_target('config', new={'region': 'a-region'})
self.assertEqual(self.target.region, 'a-region')
def test_restart_on_change(self):
from collections import OrderedDict
hashs = OrderedDict([
('path1', 100),
('path2', 200),
('path3', 300),
('path4', 400),
])
self.target.restart_map = {
'path1': ['s1'],
'path2': ['s2'],
'path3': ['s3'],
'path4': ['s2', 's4'],
}
self.patch_object(chm, 'path_hash')
self.path_hash.side_effect = lambda x: hashs[x]
self.patch_object(chm, 'service_restart')
# slightly awkard, in that we need to test a context manager
with self.target.restart_on_change():
# test with no restarts
pass
self.assertEqual(self.service_restart.call_count, 0)
with self.target.restart_on_change():
# test with path1 and path3 restarts
for k in ['path1', 'path3']:
hashs[k] += 1
self.assertEqual(self.service_restart.call_count, 2)
self.service_restart.assert_any_call('s1')
self.service_restart.assert_any_call('s3')
# test with path2 and path4 and that s2 only gets restarted once
self.service_restart.reset_mock()
with self.target.restart_on_change():
for k in ['path2', 'path4']:
hashs[k] += 1
self.assertEqual(self.service_restart.call_count, 2)
calls = [mock.call('s2'), mock.call('s4')]
self.service_restart.assert_has_calls(calls)
def test_restart_all(self):
self.patch_object(chm, 'service_restart')
self.patch_target('services', new=['s1', 's2'])
self.target.restart_all()
self.assertEqual(self.service_restart.call_args_list,
[mock.call('s1'), mock.call('s2')])
def test_db_sync(self):
self.patch_object(chm, 'leader_get')
self.patch_object(chm, 'leader_set')
self.patch_object(chm, 'subprocess', name='subprocess')
self.patch_target('restart_all')
# first check with leader_get returning True
self.leader_get.return_value = True
self.target.db_sync()
self.leader_get.assert_called_once_with(attribute='db-sync-done')
self.subprocess.check_call.assert_not_called()
self.leader_set.assert_not_called()
self.restart_all.assert_not_called()
# Now check with leader_get returning False
self.leader_get.reset_mock()
self.leader_get.return_value = False
self.target.sync_cmd = ['a', 'cmd']
self.target.db_sync()
self.leader_get.assert_called_once_with(attribute='db-sync-done')
self.subprocess.check_call.assert_called_once_with(['a', 'cmd'])
self.leader_set.assert_called_once_with({'db-sync-done': True})
self.restart_all.assert_called_once_with()
class MyAdapter(object):
def __init__(self, interfaces):
self.interfaces = interfaces
class MyOpenStackCharm(chm.OpenStackCharm):
name = 'my-charm'
packages = ['p1', 'p2', 'p3', 'package-to-filter']
api_ports = {
'service1': {
chm.PUBLIC: 1,
chm.INTERNAL: 2,
},
'service2': {
chm.PUBLIC: 3,
},
'my-default-service': {
chm.PUBLIC: 1234,
chm.ADMIN: 2468,
chm.INTERNAL: 3579,
},
}
service_type = 'my-service-type'
default_service = 'my-default-service'
restart_map = {
'path1': ['s1'],
'path2': ['s2'],
'path3': ['s3'],
'path4': ['s2', 's4'],
}
sync_cmd = ['my-sync-cmd', 'param1']
services = ['my-default-service', 'my-second-service']
adapters_class = MyAdapter
class TestMyOpenStackCharm(BaseOpenStackCharmTest):
def setUp(self):
def make_open_stack_charm():
return MyOpenStackCharm(['interface1', 'interface2'])
super(TestMyOpenStackCharm, self).setUp(make_open_stack_charm,
TEST_CONFIG)
def test_install(self):
# tests that the packages are filtered before installation
self.patch_target('set_state')
self.patch_object(chm, 'filter_installed_packages',
return_value=None,
name='fip')
self.fip.side_effect = lambda x: ['p1', 'p2']
self.patch_object(chm, 'status_set')
self.patch_object(chm, 'apt_install')
self.target.install()
self.target.set_state.assert_called_once_with('my-charm-installed')
self.fip.assert_called_once_with(self.target.packages)
self.status_set.assert_called_once_with('maintenance',
'Installing packages')
def test_api_port(self):
self.assertEqual(self.target.api_port('service1'), 1)
self.assertEqual(self.target.api_port('service1', chm.PUBLIC), 1)
self.assertEqual(self.target.api_port('service2'), 3)
with self.assertRaises(KeyError):
self.target.api_port('service3')
with self.assertRaises(KeyError):
self.target.api_port('service2', chm.INTERNAL)
def test_public_url(self):
self.patch_object(chm, 'canonical_url', return_value='my-ip-address')
self.assertEqual(self.target.public_url, 'my-ip-address:1234')
self.canonical_url.assert_called_once_with(chm.PUBLIC)
def test_admin_url(self):
self.patch_object(chm, 'canonical_url', return_value='my-ip-address')
self.assertEqual(self.target.admin_url, 'my-ip-address:2468')
self.canonical_url.assert_called_once_with(chm.ADMIN)
def test_internal_url(self):
self.patch_object(chm, 'canonical_url', return_value='my-ip-address')
self.assertEqual(self.target.internal_url, 'my-ip-address:3579')
self.canonical_url.assert_called_once_with(chm.INTERNAL)
def test_render_all_configs(self):
self.patch_target('render_configs')
self.target.render_all_configs()
self.assertEqual(self.render_configs.call_count, 1)
args = self.render_configs.call_args_list[0][0][0]
self.assertEqual(['path1', 'path2', 'path3', 'path4'],
sorted(args))
def test_render_configs(self):
# give us a way to check that the context manager was called.
from contextlib import contextmanager
d = [0]
@contextmanager
def fake_restart_on_change():
d[0] += 1
yield
self.patch_target('restart_on_change', new=fake_restart_on_change)
self.patch_object(chm, 'render')
self.patch_object(chm, 'get_loader', return_value='my-loader')
# self.patch_target('adapter_instance', new='my-adapter')
self.target.render_configs(['path1'])
self.assertEqual(d[0], 1)
self.render.assert_called_once_with(
source='path1',
template_loader='my-loader',
target='path1',
context=mock.ANY)
# assert the context was an MyAdapter instance.
context = self.render.call_args_list[0][1]['context']
assert isinstance(context, MyAdapter)
self.assertEqual(context.interfaces, ['interface1', 'interface2'])

View File

@ -1,108 +0,0 @@
# Note that the unit_tests/__init__.py has the following lines to stop
# side effects from the imorts from charm helpers.
# sys.path.append('./lib')
# mock out some charmhelpers libraries as they have apt install side effects
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
import utils
import charm.openstack.ip as ip
class TestCharmOpenStackIp(utils.BaseTestCase):
def test_canonical_url(self):
self.patch_object(ip, 'resolve_address', return_value='address1')
self.patch_object(ip, 'is_ipv6', return_value=False)
# not ipv6
url = ip.canonical_url()
self.assertEqual(url, 'http://address1')
self.resolve_address.assert_called_once_with(ip.PUBLIC)
# is ipv6
self.is_ipv6.return_value = True
self.resolve_address.reset_mock()
url = ip.canonical_url()
self.assertEqual(url, 'http://[address1]')
self.resolve_address.assert_called_once_with(ip.PUBLIC)
# test we check for enpoint type
self.is_ipv6.return_value = False
self.resolve_address.reset_mock()
url = ip.canonical_url(ip.INTERNAL)
self.resolve_address.assert_called_once_with(ip.INTERNAL)
def test_resolve_address(self):
self.patch_object(ip, 'is_clustered')
self.patch_object(ip, 'config')
self.patch_object(ip, 'is_address_in_network')
self.patch_object(ip, 'get_ipv6_addr')
self.patch_object(ip, 'unit_get')
self.patch_object(ip, 'get_address_in_network')
# define a fake_config() that returns predictable results and remembers
# what it was called with.
calls_list = []
_config = {
'vip': 'vip-address',
'prefer-ipv6': False,
'os-public-network': 'the-public-network',
'os-internal-network': 'the-internal-network',
'os-admin-network': 'the-admin-network',
}
def fake_config(*args):
calls_list.append(args)
return _config[args[0]]
self.config.side_effect = fake_config
# first test, if not clustered, that the function uses unit_get() and
# get_address_in_network to get a real address.
# for the default PUBLIC endpoint
self.is_clustered.return_value = False
self.get_address_in_network.return_value = 'got-address'
self.unit_get.return_value = 'unit-get-address'
addr = ip.resolve_address()
self.assertEqual(addr, 'got-address')
self.assertEqual(calls_list,
[('prefer-ipv6',), ('os-public-network',)])
self.unit_get.assert_called_once_with('public-address')
self.get_address_in_network.assert_called_once_with(
'the-public-network', 'unit-get-address')
# second test: not clusted, prefer-ipv6 is True
_config['prefer-ipv6'] = True
calls_list = []
self.get_ipv6_addr.return_value = ['ipv6-addr']
self.get_address_in_network.reset_mock()
addr = ip.resolve_address()
self.get_ipv6_addr.assert_called_once_with(exc_list=['vip-address'])
self.get_address_in_network.assert_called_once_with(
'the-public-network', 'ipv6-addr')
# Third test: clustered, and config(...) returns None
self.is_clustered.return_value = True
_config['os-public-network'] = None
calls_list = []
addr = ip.resolve_address()
self.assertEqual(calls_list, [('os-public-network',), ('vip',)])
# Fourth test: clustered, and config(...) returns not None
_config['os-public-network'] = 'the-public-network'
calls_list = []
_config['vip'] = 'vip1 vip2'
self.is_address_in_network.return_value = (False, True)
addr = ip.resolve_address()
self.assertEqual(calls_list, [
('os-public-network',),
('vip',),
('os-public-network',),
('os-public-network',)])
self.assertEqual(addr, 'vip2')
# Finally resolved_address returns None -> ValueError()
# allow vip to not be found:
self.is_address_in_network.return_value = False
with self.assertRaises(ValueError):
addr = ip.resolve_address()

View File

@ -1,48 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Note that the unit_tests/__init__.py also mocks out two charmhelpers imports
# that have side effects that try to apt install modules:
# sys.modules['charmhelpers.contrib.openstack.utils'] = mock.MagicMock()
# sys.modules['charmhelpers.contrib.network.ip'] = mock.MagicMock()
import unittest
import mock
class BaseTestCase(unittest.TestCase):
def setUp(self):
self._patches = {}
self._patches_start = {}
def tearDown(self):
for k, v in self._patches.items():
v.stop()
setattr(self, k, None)
self._patches = None
self._patches_start = None
def patch_object(self, obj, attr, return_value=None, name=None, new=None):
if name is None:
name = attr
if new is not None:
mocked = mock.patch.object(obj, attr, new=new)
else:
mocked = mock.patch.object(obj, attr)
self._patches[name] = mocked
started = mocked.start()
if new is None:
started.return_value = return_value
self._patches_start[name] = started
setattr(self, name, started)

View File

@ -1 +1,3 @@
netifaces>=0.10.4
#charms_openstack
git+https://github.com/openstack-charmers/charms.openstack.git#egg=charms.openstack