Update MESO client and tox

Change-Id: I01079a8bce13cb1228a990de2827a19a53c0ccca
Signed-off-by: Tung Doan <doantungbk.203@gmail.com>
This commit is contained in:
Tung Doan 2018-09-07 11:44:31 -07:00
parent a8a09446a5
commit be59d30267
34 changed files with 504 additions and 1549 deletions

1
.gitignore vendored
View File

@ -20,6 +20,7 @@ run_tests.log
.idea/
.tox/
.venv/
.stestr/
# Files created by releasenotes build
releasenotes/build

3
.stestr.conf Normal file
View File

@ -0,0 +1,3 @@
[DEFAULT]
test_path=./apmecclient/tests/unit
top_path=./

View File

@ -1,4 +0,0 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ ${OS_TEST_PATH:-./apmecclient/tests/unit} $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -38,20 +38,20 @@ def make_client(instance):
url = url.rstrip("/")
if '1.0' == instance._api_version[API_NAME]:
client = apmec_client(username=instance._username,
tenant_name=instance._tenant_name,
password=instance._password,
region_name=instance._region_name,
auth_url=instance._auth_url,
endpoint_url=url,
endpoint_type=instance._endpoint_type,
token=instance._token,
auth_strategy=instance._auth_strategy,
insecure=instance._insecure,
ca_cert=instance._ca_cert,
retries=instance._retries,
raise_errors=instance._raise_errors,
session=instance._session,
auth=instance._auth)
tenant_name=instance._tenant_name,
password=instance._password,
region_name=instance._region_name,
auth_url=instance._auth_url,
endpoint_url=url,
endpoint_type=instance._endpoint_type,
token=instance._token,
auth_strategy=instance._auth_strategy,
insecure=instance._insecure,
ca_cert=instance._ca_cert,
retries=instance._retries,
raise_errors=instance._raise_errors,
session=instance._session,
auth=instance._auth)
return client
else:
raise exceptions.UnsupportedVersion(_("API version %s is not "

View File

@ -74,7 +74,7 @@ def _find_resourceid_by_name(client, resource, name):
info = data[collection]
if len(info) > 1:
raise exceptions.ApmecClientNoUniqueMatch(resource=resource,
name=name)
name=name)
elif len(info) == 0:
not_found_message = (_("Unable to find %(resource)s with name "
"'%(name)s'") %
@ -349,7 +349,7 @@ class ApmecCommandMeta(abc.ABCMeta):
cls_dict['log'] = logging.getLogger(
cls_dict['__module__'] + '.' + name)
return super(ApmecCommandMeta, cls).__new__(cls,
name, bases, cls_dict)
name, bases, cls_dict)
@six.add_metaclass(ApmecCommandMeta)

View File

@ -17,9 +17,9 @@
import yaml
from apmecclient.apmec import v1_0 as apmecV10
from apmecclient.common import exceptions
from apmecclient.i18n import _
from apmecclient.apmec import v1_0 as apmecV10
_MEA = 'mea'
@ -104,15 +104,15 @@ class CreateMEA(apmecV10.CreateCommand):
apmec_client.format = parsed_args.request_format
if parsed_args.vim_name:
_id = apmecV10.find_resourceid_by_name_or_id(apmec_client,
'vim',
parsed_args.
vim_name)
'vim',
parsed_args.
vim_name)
parsed_args.vim_id = _id
if parsed_args.mead_name:
_id = apmecV10.find_resourceid_by_name_or_id(apmec_client,
'mead',
parsed_args.
mead_name)
'mead',
parsed_args.
mead_name)
parsed_args.mead_id = _id
elif parsed_args.mead_template:
with open(parsed_args.mead_template) as f:
@ -132,8 +132,8 @@ class CreateMEA(apmecV10.CreateCommand):
except yaml.YAMLError as e:
raise exceptions.InvalidInput(e)
apmecV10.update_dict(parsed_args, body[self.resource],
['tenant_id', 'name', 'description',
'mead_id', 'vim_id'])
['tenant_id', 'name', 'description',
'mead_id', 'vim_id'])
return body
@ -163,7 +163,7 @@ class UpdateMEA(apmecV10.UpdateCommand):
raise exceptions.InvalidInput(e)
if parsed_args.config:
config = parsed_args.config
if isinstance(config, str) or isinstance(config, unicode):
if isinstance(config, str) or isinstance(config, unicode): # noqa
config_str = parsed_args.config.decode('unicode_escape')
try:
config = yaml.load(config_str, Loader=yaml.SafeLoader)
@ -210,8 +210,8 @@ class ListMEAResources(apmecV10.ListCommand):
apmec_client.format = parsed_args.request_format
if self.allow_names:
_id = apmecV10.find_resourceid_by_name_or_id(apmec_client,
self.resource,
parsed_args.id)
self.resource,
parsed_args.id)
else:
_id = parsed_args.id
@ -225,7 +225,7 @@ class ListMEAResources(apmecV10.ListCommand):
apmec_client.format = parsed_args.request_format
_extra_values = apmecV10.parse_args_to_dict(self.values_specs)
apmecV10._merge_args(self, parsed_args, _extra_values,
self.values_specs)
self.values_specs)
search_opts = self.args2search_opts(parsed_args)
search_opts.update(_extra_values)
if self.pagination_support:
@ -292,9 +292,9 @@ class ScaleMEA(apmecV10.ApmecCommand):
apmec_client = self.get_client()
apmec_client.format = parsed_args.request_format
_id = apmecV10.find_resourceid_by_name_or_id(apmec_client,
'mea',
parsed_args.
mea_name)
'mea',
parsed_args.
mea_name)
parsed_args.mea_id = _id
args['mea_id'] = parsed_args.mea_id

View File

@ -20,10 +20,9 @@ from __future__ import print_function
from oslo_serialization import jsonutils
import yaml
from apmecclient.apmec import v1_0 as apmecV10
from apmecclient.common import exceptions
from apmecclient.i18n import _
from apmecclient.apmec import v1_0 as apmecV10
_MEAD = "mead"
@ -88,7 +87,7 @@ class CreateMEAD(apmecV10.CreateCommand):
raise exceptions.InvalidInput("mead file is empty")
body[self.resource]['attributes'] = {'mead': mead}
apmecV10.update_dict(parsed_args, body[self.resource],
['tenant_id', 'name', 'description'])
['tenant_id', 'name', 'description'])
return body

View File

@ -0,0 +1,163 @@
#
# Copyright 2013 Intel Corporation
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import yaml
from apmecclient.apmec import v1_0 as apmecV10
from apmecclient.common import exceptions
from apmecclient.i18n import _
_MECA = 'meca'
_RESOURCE = 'resource'
class ListMECA(apmecV10.ListCommand):
"""List MECA that belong to a given tenant."""
resource = _MECA
list_columns = ['id', 'name', 'mecad_id', 'mgmt_urls', 'status']
class ShowMECA(apmecV10.ShowCommand):
"""Show information of a given MECA."""
resource = _MECA
class CreateMECA(apmecV10.CreateCommand):
"""Create a MECA."""
resource = _MECA
remove_output_fields = ["attributes"]
def add_known_arguments(self, parser):
parser.add_argument(
'name', metavar='NAME',
help=_('Set a name for the MECA'))
parser.add_argument(
'--description',
help=_('Set description for the MECA'))
mecad_group = parser.add_mutually_exclusive_group(required=True)
mecad_group.add_argument(
'--mecad-id',
help=_('MECAD ID to use as template to create MECA'))
mecad_group.add_argument(
'--mecad-template',
help=_('MECAD file to create MECA'))
mecad_group.add_argument(
'--mecad-name',
help=_('MECAD name to use as template to create MECA'))
vim_group = parser.add_mutually_exclusive_group()
vim_group.add_argument(
'--vim-id',
help=_('VIM ID to use to create MECA on the specified VIM'))
vim_group.add_argument(
'--vim-name',
help=_('VIM name to use to create MECA on the specified VIM'))
parser.add_argument(
'--vim-region-name',
help=_('VIM Region to use to create MECA on the specified VIM'))
parser.add_argument(
'--param-file',
help=_('Specify parameter yaml file'))
def args2body(self, parsed_args):
args = {'attributes': {}}
body = {self.resource: args}
if parsed_args.vim_region_name:
args.setdefault('placement_attr', {})['region_name'] = \
parsed_args.vim_region_name
apmec_client = self.get_client()
apmec_client.format = parsed_args.request_format
if parsed_args.vim_name:
_id = apmecV10.find_resourceid_by_name_or_id(apmec_client,
'vim',
parsed_args.
vim_name)
parsed_args.vim_id = _id
if parsed_args.mecad_name:
_id = apmecV10.find_resourceid_by_name_or_id(apmec_client,
'mecad',
parsed_args.
mecad_name)
parsed_args.mecad_id = _id
elif parsed_args.mecad_template:
with open(parsed_args.mecad_template) as f:
template = f.read()
try:
args['mecad_template'] = yaml.load(
template, Loader=yaml.SafeLoader)
except yaml.YAMLError as e:
raise exceptions.InvalidInput(e)
if not args['mecad_template']:
raise exceptions.InvalidInput('The mecad file is empty')
if parsed_args.param_file:
with open(parsed_args.param_file) as f:
param_yaml = f.read()
try:
args['attributes']['param_values'] = yaml.load(
param_yaml, Loader=yaml.SafeLoader)
except yaml.YAMLError as e:
raise exceptions.InvalidInput(e)
apmecV10.update_dict(parsed_args, body[self.resource],
['tenant_id', 'name', 'description',
'mecad_id', 'vim_id'])
return body
class DeleteMECA(apmecV10.DeleteCommand):
"""Delete given MECA(s)."""
resource = _MECA
deleted_msg = {'meca': 'delete initiated'}
class UpdateMECA(apmecV10.UpdateCommand):
"""Update a given MES."""
resource = _MECA
def add_known_arguments(self, parser):
parser.add_argument(
'--mecad-template',
help=_('MECAD file to update MECA')
)
def args2body(self, parsed_args):
args = {}
body = {self.resource: args}
apmec_client = self.get_client()
apmec_client.format = parsed_args.request_format
if parsed_args.mecad_template:
with open(parsed_args.mecad_template) as f:
template = f.read()
try:
args['mecad_template'] = yaml.load(
template, Loader=yaml.SafeLoader)
except yaml.YAMLError as e:
raise exceptions.InvalidInput(e)
if not args['mecad_template']:
raise exceptions.InvalidInput('The mecad template is empty')
apmecV10.update_dict(parsed_args, body[self.resource],
['tenant_id'])
return body

View File

@ -0,0 +1,103 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
from oslo_serialization import jsonutils
import yaml
from apmecclient.apmec import v1_0 as apmecV10
from apmecclient.i18n import _
_MECAD = "mecad"
class ListMECAD(apmecV10.ListCommand):
"""List MECADs that belong to a given tenant."""
resource = _MECAD
list_columns = ['id', 'name', 'template_source', 'description']
def get_parser(self, prog_name):
parser = super(ListMECAD, self).get_parser(prog_name)
parser.add_argument(
'--template-source',
help=_("List MECAD with specified template source. Available \
options are 'onboared' (default), 'inline' or 'all'"),
action='store',
default='onboarded')
return parser
def args2search_opts(self, parsed_args):
search_opts = super(ListMECAD, self).args2search_opts(parsed_args)
template_source = parsed_args.template_source
if parsed_args.template_source:
search_opts.update({'template_source': template_source})
return search_opts
class ShowMECAD(apmecV10.ShowCommand):
"""Show information of a given MECAD."""
resource = _MECAD
class CreateMECAD(apmecV10.CreateCommand):
"""Create a MECAD."""
resource = _MECAD
remove_output_fields = ["attributes"]
def add_known_arguments(self, parser):
parser.add_argument('--mecad-file', help='Specify MECAD file',
required=True)
parser.add_argument(
'name', metavar='NAME',
help='Set a name for the MECAD')
parser.add_argument(
'--description',
help='Set a description for the MECAD')
def args2body(self, parsed_args):
body = {self.resource: {}}
mecad = None
with open(parsed_args.mecad_file) as f:
mecad = yaml.safe_load(f.read())
apmecV10.update_dict(parsed_args, body[self.resource],
['tenant_id', 'name', 'description'])
if mecad:
body[self.resource]['attributes'] = {'mecad': mecad}
return body
class DeleteMECAD(apmecV10.DeleteCommand):
"""Delete a given MECAD."""
resource = _MECAD
class ShowTemplateMECAD(apmecV10.ShowCommand):
"""Show template of a given MECAD."""
resource = _MECAD
def run(self, parsed_args):
self.log.debug('run(%s)', parsed_args)
template = None
data = self.get_data(parsed_args)
try:
attributes_index = data[0].index('attributes')
attributes_json = data[1][attributes_index]
template = jsonutils.loads(attributes_json).get('mecad', None)
except (IndexError, TypeError, ValueError) as e:
self.log.debug('Data handling error: %s', str(e))
print(template or _('Unable to display MECAD template!'))

View File

@ -18,10 +18,11 @@ import yaml
from oslo_utils import strutils
from apmecclient.common import exceptions
from apmecclient.i18n import _
from apmecclient.apmec import v1_0 as apmecV10
from apmecclient.apmec.v1_0.meo import vim_utils
from apmecclient.common import exceptions
from apmecclient.i18n import _
_VIM = "vim"
@ -77,14 +78,14 @@ class CreateVIM(apmecV10.CreateCommand):
auth_url = config_param.pop('auth_url')
except KeyError:
raise exceptions.ApmecClientException(message='Auth URL must be '
'specified',
status_code=404)
'specified',
status_code=404)
vim_obj['auth_url'] = vim_utils.validate_auth_url(auth_url).geturl()
vim_obj['type'] = config_param.pop('type', 'openstack')
vim_utils.args2body_vim(config_param, vim_obj)
apmecV10.update_dict(parsed_args, body[self.resource],
['tenant_id', 'name', 'description',
'is_default'])
['tenant_id', 'name', 'description',
'is_default'])
return body
@ -125,8 +126,8 @@ class UpdateVIM(apmecV10.UpdateCommand):
if config_param is not None:
vim_utils.args2body_vim(config_param, vim_obj)
apmecV10.update_dict(parsed_args, body[self.resource],
['tenant_id', 'name', 'description',
'is_default'])
['tenant_id', 'name', 'description',
'is_default'])
return body

View File

@ -29,8 +29,8 @@ def args2body_vim(config_param, vim):
config_param.pop('project_domain_name', '')}
if not vim['vim_project']['name']:
raise exceptions.ApmecClientException(message='Project name '
'must be specified',
status_code=404)
'must be specified',
status_code=404)
vim['auth_cred'] = {'username': config_param.pop('username', ''),
'password': config_param.pop('password', ''),
'user_domain_name':

View File

View File

@ -1,20 +1,25 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICEMESE-2.0
# Copyright 2013 Intel Corporation
# All Rights Reserved.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIOMES OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import yaml
from apmecclient.apmec import v1_0 as apmecV10
from apmecclient.common import exceptions
from apmecclient.i18n import _
from apmecclient.apmec import v1_0 as apmecV10
_MES = 'mes'
@ -82,15 +87,15 @@ class CreateMES(apmecV10.CreateCommand):
apmec_client.format = parsed_args.request_format
if parsed_args.vim_name:
_id = apmecV10.find_resourceid_by_name_or_id(apmec_client,
'vim',
parsed_args.
vim_name)
'vim',
parsed_args.
vim_name)
parsed_args.vim_id = _id
if parsed_args.mesd_name:
_id = apmecV10.find_resourceid_by_name_or_id(apmec_client,
'mesd',
parsed_args.
mesd_name)
'mesd',
parsed_args.
mesd_name)
parsed_args.mesd_id = _id
elif parsed_args.mesd_template:
with open(parsed_args.mesd_template) as f:
@ -112,8 +117,8 @@ class CreateMES(apmecV10.CreateCommand):
except yaml.YAMLError as e:
raise exceptions.InvalidInput(e)
apmecV10.update_dict(parsed_args, body[self.resource],
['tenant_id', 'name', 'description',
'mesd_id', 'vim_id'])
['tenant_id', 'name', 'description',
'mesd_id', 'vim_id'])
return body
@ -122,3 +127,37 @@ class DeleteMES(apmecV10.DeleteCommand):
resource = _MES
deleted_msg = {'mes': 'delete initiated'}
class UpdateMES(apmecV10.UpdateCommand):
"""Update a given MES."""
resource = _MES
def add_known_arguments(self, parser):
parser.add_argument(
'--mesd-template',
help=_('MESD file to update MES')
)
def args2body(self, parsed_args):
args = {}
body = {self.resource: args}
apmec_client = self.get_client()
apmec_client.format = parsed_args.request_format
if parsed_args.mesd_template:
with open(parsed_args.mesd_template) as f:
template = f.read()
try:
args['mesd_template'] = yaml.load(
template, Loader=yaml.SafeLoader)
except yaml.YAMLError as e:
raise exceptions.InvalidInput(e)
if not args['mesd_template']:
raise exceptions.InvalidInput('The mesd template is empty')
apmecV10.update_dict(parsed_args, body[self.resource],
['tenant_id'])
return body

View File

@ -16,9 +16,10 @@ import yaml
from oslo_serialization import jsonutils
from apmecclient.i18n import _
from apmecclient.apmec import v1_0 as apmecV10
from apmecclient.i18n import _
_MESD = "mesd"
@ -73,7 +74,7 @@ class CreateMESD(apmecV10.CreateCommand):
with open(parsed_args.mesd_file) as f:
mesd = yaml.safe_load(f.read())
apmecV10.update_dict(parsed_args, body[self.resource],
['tenant_id', 'name', 'description'])
['tenant_id', 'name', 'description'])
if mesd:
body[self.resource]['attributes'] = {'mesd': mesd}

View File

@ -16,9 +16,9 @@
"""Manage access to the clients, including authenticating when needed.
"""
from apmecclient.apmec import client as apmec_client
from apmecclient import client
from apmecclient.apmec import client as apmec_client
class ClientCache(object):

View File

@ -39,19 +39,25 @@ import six.moves.urllib.parse as urlparse
from cliff import app
from cliff import commandmanager
from apmecclient.apmec.v1_0.events import events
from apmecclient.apmec.v1_0 import extension
from apmecclient.apmec.v1_0.mem import mea
from apmecclient.apmec.v1_0.mem import mead
from apmecclient.apmec.v1_0.meso import mes
from apmecclient.apmec.v1_0.meso import mesd
from apmecclient.apmec.v1_0.meo import meca
from apmecclient.apmec.v1_0.meo import mecad
from apmecclient.apmec.v1_0.meo import vim
from apmecclient.common import clientmanager
from apmecclient.common import command as openstack_command
from apmecclient.common import exceptions as exc
from apmecclient.common import extension as client_extension
from apmecclient.common import utils
from apmecclient.i18n import _
from apmecclient.apmec.v1_0.events import events
from apmecclient.apmec.v1_0 import extension
from apmecclient.apmec.v1_0.meo import mes
from apmecclient.apmec.v1_0.meo import mesd
from apmecclient.apmec.v1_0.meo import vim
from apmecclient.apmec.v1_0.mem import mea
from apmecclient.apmec.v1_0.mem import mead
from apmecclient.version import __version__
@ -145,6 +151,19 @@ COMMAND_V1 = {
'mes-list': mes.ListMES,
'mes-delete': mes.DeleteMES,
'mes-show': mes.ShowMES,
'mes-update': mes.UpdateMES,
'mecad-create': mecad.CreateMECAD,
'mecad-list': mecad.ListMECAD,
'mecad-delete': mecad.DeleteMECAD,
'mecad-show': mecad.ShowMECAD,
'mecad-template-show': mecad.ShowTemplateMECAD,
'meca-create': meca.CreateMECA,
'meca-list': meca.ListMECA,
'meca-delete': meca.DeleteMECA,
'meca-show': meca.ShowMECA,
'meca-update': meca.UpdateMECA
}

View File

@ -1,370 +0,0 @@
# Copyright 2012 NEC Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import json
import uuid
from keystoneclient import exceptions as k_exceptions
import mock
import requests
import testtools
from apmecclient import client
from apmecclient.common import exceptions
USERNAME = 'testuser'
USER_ID = 'testuser_id'
TENANT_NAME = 'testtenant'
TENANT_ID = 'testtenant_id'
PASSWORD = 'password'
AUTH_URL = 'authurl'
ENDPOINT_URL = 'localurl'
ENDPOINT_OVERRIDE = 'otherurl'
TOKEN = 'tokentoken'
REGION = 'RegionTest'
NOAUTH = 'noauth'
KS_TOKEN_RESULT = {
'access': {
'token': {'id': TOKEN,
'expires': '2012-08-11T07:49:01Z',
'tenant': {'id': str(uuid.uuid1())}},
'user': {'id': str(uuid.uuid1())},
'serviceCatalog': [
{'endpoints_links': [],
'endpoints': [{'adminURL': ENDPOINT_URL,
'internalURL': ENDPOINT_URL,
'publicURL': ENDPOINT_URL,
'region': REGION}],
'type': 'mec-orchestration',
'name': 'Apmec Service'}
]
}
}
ENDPOINTS_RESULT = {
'endpoints': [{
'type': 'mec-orchestration',
'name': 'Apmec Service',
'region': REGION,
'adminURL': ENDPOINT_URL,
'internalURL': ENDPOINT_URL,
'publicURL': ENDPOINT_URL
}]
}
def get_response(status_code, headers=None):
response = mock.Mock().CreateMock(requests.Response)
response.headers = headers or {}
response.status_code = status_code
return response
resp_200 = get_response(200)
resp_401 = get_response(401)
headers = {'X-Auth-Token': '',
'User-Agent': 'python-apmecclient'}
expected_headers = {'X-Auth-Token': TOKEN,
'User-Agent': 'python-apmecclient'}
agent_header = {'User-Agent': 'python-apmecclient'}
class CLITestAuthNoAuth(testtools.TestCase):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthNoAuth, self).setUp()
self.client = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
endpoint_url=ENDPOINT_URL,
auth_strategy=NOAUTH,
region_name=REGION)
self.addCleanup(mock.patch.stopall)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_get_noauth(self, mock_request):
mock_request.return_value = (resp_200, '')
self.client.do_request('/resource', 'GET',
headers=headers)
mock_request.assert_called_once_with(
ENDPOINT_URL + '/resource',
'GET',
headers=headers)
self.assertEqual(self.client.endpoint_url, ENDPOINT_URL)
class CLITestAuthKeystone(testtools.TestCase):
# Auth Body expected
auth_body = ('{"auth": {"tenantName": "testtenant", '
'"passwordCredentials": '
'{"username": "testuser", "password": "password"}}}')
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystone, self).setUp()
self.client = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
self.addCleanup(mock.patch.stopall)
def test_reused_token_get_auth_info(self):
"""Test Client.get_auth_info().
Test that Client.get_auth_info() works even if client was
instantiated with predefined token.
"""
client_ = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
token=TOKEN,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
expected = {'auth_token': TOKEN,
'auth_tenant_id': None,
'auth_user_id': None,
'endpoint_url': self.client.endpoint_url}
self.assertEqual(client_.get_auth_info(), expected)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_get_token(self, mock_request):
mock_request.return_value = (resp_200, json.dumps(KS_TOKEN_RESULT))
self.client.do_request('/resource', 'GET')
mock_request.assert_called_with(
ENDPOINT_URL + '/resource', 'GET',
headers=expected_headers)
self.assertEqual(self.client.endpoint_url, ENDPOINT_URL)
self.assertEqual(self.client.auth_token, TOKEN)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_refresh_token(self, mock_request):
self.client.auth_token = TOKEN
self.client.endpoint_url = ENDPOINT_URL
# If a token is expired, apmec server retruns 401
mock_request.return_value = (resp_401, '')
self.assertRaises(exceptions.Unauthorized,
self.client.do_request,
'/resource',
'GET')
mock_request.return_value = (resp_200, json.dumps(KS_TOKEN_RESULT))
self.client.do_request('/resource', 'GET')
mock_request.assert_called_with(
ENDPOINT_URL + '/resource', 'GET',
headers=expected_headers)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_refresh_token_no_auth_url(self, mock_request):
self.client.auth_url = None
self.client.auth_token = TOKEN
self.client.endpoint_url = ENDPOINT_URL
# If a token is expired, apmec server retruns 401
mock_request.return_value = (resp_401, '')
self.assertRaises(exceptions.NoAuthURLProvided,
self.client.do_request,
'/resource',
'GET')
expected_url = ENDPOINT_URL + '/resource'
mock_request.assert_called_with(expected_url, 'GET',
headers=expected_headers)
def test_get_endpoint_url_with_invalid_auth_url(self):
# Handle the case when auth_url is not provided
self.client.auth_url = None
self.assertRaises(exceptions.NoAuthURLProvided,
self.client._get_endpoint_url)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_get_endpoint_url(self, mock_request):
self.client.auth_token = TOKEN
mock_request.return_value = (resp_200, json.dumps(ENDPOINTS_RESULT))
self.client.do_request('/resource', 'GET')
mock_request.assert_called_with(
ENDPOINT_URL + '/resource', 'GET',
headers=expected_headers)
mock_request.return_value = (resp_200, '')
self.client.do_request('/resource', 'GET',
headers=headers)
mock_request.assert_called_with(
ENDPOINT_URL + '/resource', 'GET',
headers=headers)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_use_given_endpoint_url(self, mock_request):
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION,
endpoint_url=ENDPOINT_OVERRIDE)
self.assertEqual(self.client.endpoint_url, ENDPOINT_OVERRIDE)
self.client.auth_token = TOKEN
mock_request.return_value = (resp_200, '')
self.client.do_request('/resource', 'GET',
headers=headers)
mock_request.assert_called_with(
ENDPOINT_OVERRIDE + '/resource', 'GET',
headers=headers)
self.assertEqual(self.client.endpoint_url, ENDPOINT_OVERRIDE)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_get_endpoint_url_other(self, mock_request):
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='otherURL')
self.client.auth_token = TOKEN
mock_request.return_value = (resp_200, json.dumps(ENDPOINTS_RESULT))
self.assertRaises(exceptions.EndpointTypeNotFound,
self.client.do_request,
'/resource',
'GET')
expected_url = AUTH_URL + '/tokens/%s/endpoints' % TOKEN
headers = {'User-Agent': 'python-apmecclient'}
mock_request.assert_called_with(expected_url, 'GET',
headers=headers)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_get_endpoint_url_failed(self, mock_request):
self.client.auth_token = TOKEN
self.client.auth_url = AUTH_URL + '/tokens/%s/endpoints' % TOKEN
mock_request.return_value = (resp_401, '')
self.assertRaises(exceptions.Unauthorized,
self.client.do_request,
'/resource',
'GET')
def test_endpoint_type(self):
resources = copy.deepcopy(KS_TOKEN_RESULT)
endpoints = resources['access']['serviceCatalog'][0]['endpoints'][0]
endpoints['internalURL'] = 'internal'
endpoints['adminURL'] = 'admin'
endpoints['publicURL'] = 'public'
# Test default behavior is to choose public.
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION)
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'public')
# Test admin url
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='adminURL')
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'admin')
# Test public url
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='publicURL')
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'public')
# Test internal url
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='internalURL')
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'internal')
# Test url that isn't found in the service catalog
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='privateURL')
self.assertRaises(k_exceptions.EndpointNotFound,
self.client._extract_service_catalog,
resources)
@mock.patch('apmecclient.client.HTTPClient.request')
@mock.patch('apmecclient.common.utils.http_log_req')
def test_strip_credentials_from_log(self, mock_http_log_req,
mock_request,):
body = ('{"auth": {"tenantId": "testtenant_id",'
'"passwordCredentials": {"password": "password",'
'"userId": "testuser_id"}}}')
expected_body = ('{"auth": {"tenantId": "testtenant_id",'
'"REDACTEDCredentials": {"REDACTED": "REDACTED",'
'"userId": "testuser_id"}}}')
_headers = {'headers': expected_headers, 'body': expected_body}
mock_request.return_value = (resp_200, json.dumps(KS_TOKEN_RESULT))
self.client.do_request('/resource', 'GET', body=body)
args, kwargs = mock_http_log_req.call_args
# Check that credentials are stripped while logging.
self.assertEqual(_headers, args[2])
class CLITestAuthKeystoneWithId(CLITestAuthKeystone):
# Auth Body expected
auth_body = ('{"auth": {"passwordCredentials": '
'{"password": "password", "userId": "testuser_id"}, '
'"tenantId": "testtenant_id"}}')
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystoneWithId, self).setUp()
self.client = client.HTTPClient(user_id=USER_ID,
tenant_id=TENANT_ID,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
class CLITestAuthKeystoneWithIdandName(CLITestAuthKeystone):
# Auth Body expected
auth_body = ('{"auth": {"passwordCredentials": '
'{"password": "password", "userId": "testuser_id"}, '
'"tenantId": "testtenant_id"}}')
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystoneWithIdandName, self).setUp()
self.client = client.HTTPClient(username=USERNAME,
user_id=USER_ID,
tenant_id=TENANT_ID,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)

View File

@ -1,119 +0,0 @@
# Copyright 2012 OpenStack Foundation.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import testtools
from apmecclient.common import exceptions
from apmecclient.apmec import v1_0 as apmecV10
class CLITestArgs(testtools.TestCase):
def test_empty(self):
_mydict = apmecV10.parse_args_to_dict([])
self.assertEqual({}, _mydict)
def test_default_bool(self):
_specs = ['--my_bool', '--arg1', 'value1']
_mydict = apmecV10.parse_args_to_dict(_specs)
self.assertTrue(_mydict['my_bool'])
def test_bool_true(self):
_specs = ['--my-bool', 'type=bool', 'true', '--arg1', 'value1']
_mydict = apmecV10.parse_args_to_dict(_specs)
self.assertTrue(_mydict['my_bool'])
def test_bool_false(self):
_specs = ['--my_bool', 'type=bool', 'false', '--arg1', 'value1']
_mydict = apmecV10.parse_args_to_dict(_specs)
self.assertFalse(_mydict['my_bool'])
def test_nargs(self):
_specs = ['--tag', 'x', 'y', '--arg1', 'value1']
_mydict = apmecV10.parse_args_to_dict(_specs)
self.assertIn('x', _mydict['tag'])
self.assertIn('y', _mydict['tag'])
def test_badarg(self):
_specs = ['--tag=t', 'x', 'y', '--arg1', 'value1']
self.assertRaises(exceptions.CommandError,
apmecV10.parse_args_to_dict, _specs)
def test_badarg_with_minus(self):
_specs = ['--arg1', 'value1', '-D']
self.assertRaises(exceptions.CommandError,
apmecV10.parse_args_to_dict, _specs)
def test_goodarg_with_minus_number(self):
_specs = ['--arg1', 'value1', '-1', '-1.0']
_mydict = apmecV10.parse_args_to_dict(_specs)
self.assertEqual(['value1', '-1', '-1.0'],
_mydict['arg1'])
def test_badarg_duplicate(self):
_specs = ['--tag=t', '--arg1', 'value1', '--arg1', 'value1']
self.assertRaises(exceptions.CommandError,
apmecV10.parse_args_to_dict, _specs)
def test_badarg_early_type_specification(self):
_specs = ['type=dict', 'key=value']
self.assertRaises(exceptions.CommandError,
apmecV10.parse_args_to_dict, _specs)
def test_arg(self):
_specs = ['--tag=t', '--arg1', 'value1']
self.assertEqual('value1',
apmecV10.parse_args_to_dict(_specs)['arg1'])
def test_dict_arg(self):
_specs = ['--tag=t', '--arg1', 'type=dict', 'key1=value1,key2=value2']
arg1 = apmecV10.parse_args_to_dict(_specs)['arg1']
self.assertEqual('value1', arg1['key1'])
self.assertEqual('value2', arg1['key2'])
def test_dict_arg_with_attribute_named_type(self):
_specs = ['--tag=t', '--arg1', 'type=dict', 'type=value1,key2=value2']
arg1 = apmecV10.parse_args_to_dict(_specs)['arg1']
self.assertEqual('value1', arg1['type'])
self.assertEqual('value2', arg1['key2'])
def test_list_of_dict_arg(self):
_specs = ['--tag=t', '--arg1', 'type=dict',
'list=true', 'key1=value1,key2=value2']
arg1 = apmecV10.parse_args_to_dict(_specs)['arg1']
self.assertEqual('value1', arg1[0]['key1'])
self.assertEqual('value2', arg1[0]['key2'])
def test_clear_action(self):
_specs = ['--anyarg', 'action=clear']
args = apmecV10.parse_args_to_dict(_specs)
self.assertIsNone(args['anyarg'])
def test_bad_values_str(self):
_specs = ['--strarg', 'type=str']
self.assertRaises(exceptions.CommandError,
apmecV10.parse_args_to_dict, _specs)
def test_bad_values_list(self):
_specs = ['--listarg', 'list=true', 'type=str']
self.assertRaises(exceptions.CommandError,
apmecV10.parse_args_to_dict, _specs)
_specs = ['--listarg', 'type=list']
self.assertRaises(exceptions.CommandError,
apmecV10.parse_args_to_dict, _specs)
_specs = ['--listarg', 'type=list', 'action=clear']
self.assertRaises(exceptions.CommandError,
apmecV10.parse_args_to_dict, _specs)

View File

@ -24,11 +24,12 @@ import six.moves.urllib.parse as urlparse
import sys
import testtools
from apmecclient.apmec import v1_0 as apmecV1_0
from apmecclient.apmec.v1_0 import ApmecCommand
from apmecclient.common import constants
from apmecclient.common import exceptions
from apmecclient import shell
from apmecclient.apmec import v1_0 as apmecV1_0
from apmecclient.apmec.v1_0 import ApmecCommand
from apmecclient.tests.unit import test_utils
from apmecclient.v1_0 import client
@ -240,7 +241,7 @@ class CLITestV10Base(testtools.TestCase):
resstr = self.client.serialize(ress)
# url method body
resource_plural = apmecV1_0._get_resource_plural(resource,
self.client)
self.client)
path = getattr(self.client, resource_plural + "_path")
# Work around for LP #1217791. XML deserializer called from
# MyComparator does not decodes XML string correctly.
@ -688,8 +689,8 @@ class CLITestV10ExceptionHandler(CLITestV10Base):
error_content=None):
if error_content is None:
error_content = {'ApmecError': {'type': error_type,
'message': error_msg,
'detail': error_detail}}
'message': error_msg,
'detail': error_detail}}
e = self.assertRaises(expected_exception,
client.exception_handler_v10,

View File

@ -1,47 +0,0 @@
# Copyright 2013 NEC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from apmecclient.apmec.v1_0.extension import ListExt
from apmecclient.apmec.v1_0.extension import ShowExt
from apmecclient.tests.unit.test_cli10 import CLITestV10Base
from apmecclient.tests.unit.test_cli10 import MyApp
class CLITestV10Extension(CLITestV10Base):
id_field = 'alias'
def test_list_extensions(self):
resources = 'extensions'
cmd = ListExt(MyApp(sys.stdout), None)
contents = [{'alias': 'ext1', 'name': 'name1', 'other': 'other1'},
{'alias': 'ext2', 'name': 'name2', 'other': 'other2'}]
ret = self._test_list_resources(resources, cmd,
response_contents=contents)
ret_words = set(ret.split())
# Check only the default columns are shown.
self.assertIn('name', ret_words)
self.assertIn('alias', ret_words)
self.assertNotIn('other', ret_words)
def test_show_extension(self):
# -F option does not work for ext-show at the moment, so -F option
# is not passed in the commandline args as other tests do.
resource = 'extension'
cmd = ShowExt(MyApp(sys.stdout), None)
args = [self.test_id]
ext_alias = self.test_id
self._test_show_resource(resource, cmd, ext_alias, args, fields=[])

View File

@ -1,39 +0,0 @@
# Copyright 2013 Intel Corporation
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import testtools
from testtools import helpers
from apmecclient.apmec import v1_0 as apmecV10
class TestCommandMeta(testtools.TestCase):
def test_apmec_command_meta_defines_log(self):
class FakeCommand(apmecV10.ApmecCommand):
pass
self.assertTrue(helpers.safe_hasattr(FakeCommand, 'log'))
self.assertIsInstance(FakeCommand.log, logging.getLoggerClass())
self.assertEqual(FakeCommand.log.name, __name__ + ".FakeCommand")
def test_apmec_command_log_defined_explicitly(self):
class FakeCommand(apmecV10.ApmecCommand):
log = None
self.assertTrue(helpers.safe_hasattr(FakeCommand, 'log'))
self.assertIsNone(FakeCommand.log)

View File

@ -1,71 +0,0 @@
# Copyright (C) 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import testtools
from apmecclient.client import HTTPClient
from apmecclient.common import exceptions
from apmecclient.tests.unit.test_cli10 import MyResp
AUTH_TOKEN = 'test_token'
END_URL = 'test_url'
METHOD = 'GET'
URL = 'http://test.test:1234/v1.0/test'
headers = {'User-Agent': 'python-apmecclient'}
class TestHTTPClient(testtools.TestCase):
def setUp(self):
super(TestHTTPClient, self).setUp()
self.addCleanup(mock.patch.stopall)
self.http = HTTPClient(token=AUTH_TOKEN, endpoint_url=END_URL)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_request_error(self, mock_request):
mock_request.side_effect = Exception('error msg')
self.assertRaises(
exceptions.ConnectionFailed,
self.http._cs_request,
URL, METHOD
)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_request_success(self, mock_request):
rv_should_be = MyResp(200), 'test content'
mock_request.return_value = rv_should_be
self.assertEqual(rv_should_be, self.http._cs_request(URL, METHOD))
@mock.patch('apmecclient.client.HTTPClient.request')
def test_request_unauthorized(self, mock_request):
mock_request.return_value = MyResp(401), 'unauthorized message'
e = self.assertRaises(exceptions.Unauthorized,
self.http._cs_request, URL, METHOD)
self.assertEqual('unauthorized message', str(e))
mock_request.assert_called_with(URL, METHOD, headers=headers)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_request_forbidden_is_returned_to_caller(self, mock_request):
rv_should_be = MyResp(403), 'forbidden message'
mock_request.return_value = rv_should_be
self.assertEqual(rv_should_be, self.http._cs_request(URL, METHOD))

View File

@ -1,190 +0,0 @@
# Copyright (C) 2013 Yahoo! Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import logging
import os
import re
import six
import sys
import fixtures
from keystoneclient import session
import mock
import testtools
from testtools import matchers
from apmecclient.common import clientmanager
from apmecclient import shell as openstack_shell
DEFAULT_USERNAME = 'username'
DEFAULT_PASSWORD = 'password'
DEFAULT_TENANT_ID = 'tenant_id'
DEFAULT_TENANT_NAME = 'tenant_name'
DEFAULT_AUTH_URL = 'http://127.0.0.1:5000/v1.0/'
DEFAULT_TOKEN = '3bcc3d3a03f44e3d8377f9247b0ad155'
DEFAULT_URL = 'http://apmec.example.org:9896/'
DEFAULT_API_VERSION = '1.0'
class ShellTest(testtools.TestCase):
FAKE_ENV = {
'OS_USERNAME': DEFAULT_USERNAME,
'OS_PASSWORD': DEFAULT_PASSWORD,
'OS_TENANT_ID': DEFAULT_TENANT_ID,
'OS_TENANT_NAME': DEFAULT_TENANT_NAME,
'OS_AUTH_URL': DEFAULT_AUTH_URL}
# Patch os.environ to avoid required auth info.
def setUp(self):
super(ShellTest, self).setUp()
for var in self.FAKE_ENV:
self.useFixture(
fixtures.EnvironmentVariable(
var, self.FAKE_ENV[var]))
def shell(self, argstr, check=False):
orig = (sys.stdout, sys.stderr)
clean_env = {}
_old_env, os.environ = os.environ, clean_env.copy()
try:
sys.stdout = six.StringIO()
sys.stderr = six.StringIO()
_shell = openstack_shell.ApmecShell(DEFAULT_API_VERSION)
_shell.run(argstr.split())
except SystemExit:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.assertEqual(exc_value.code, 0)
finally:
stdout = sys.stdout.getvalue()
stderr = sys.stderr.getvalue()
sys.stdout.close()
sys.stderr.close()
sys.stdout, sys.stderr = orig
os.environ = _old_env
return stdout, stderr
def test_run_unknown_command(self):
self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
stdout, stderr = self.shell('fake', check=True)
self.assertFalse(stdout)
self.assertEqual("Unknown command ['fake']", stderr.strip())
def test_help(self):
required = 'usage:'
help_text, stderr = self.shell('help')
self.assertThat(
help_text,
matchers.MatchesRegex(required))
self.assertFalse(stderr)
def test_help_on_subcommand(self):
required = [
'.*?^usage: .* mead-list']
stdout, stderr = self.shell('help mead-list')
for r in required:
self.assertThat(
stdout,
matchers.MatchesRegex(r, re.DOTALL | re.MULTILINE))
self.assertFalse(stderr)
def test_help_command(self):
required = 'usage:'
help_text, stderr = self.shell('help mead-create')
self.assertThat(
help_text,
matchers.MatchesRegex(required))
self.assertFalse(stderr)
def test_unknown_auth_strategy(self):
self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
stdout, stderr = self.shell('--os-auth-strategy fake '
'mead-list')
self.assertFalse(stdout)
def test_auth(self):
with mock.patch.object(openstack_shell.ApmecShell,
'run_subcommand'), \
mock.patch.object(session, 'Session'), \
mock.patch.object(clientmanager, 'ClientManager') as mock_cmgr:
shell = openstack_shell.ApmecShell(DEFAULT_API_VERSION)
shell.options = mock.Mock()
auth_session = shell._get_keystone_session()
cmdline = ('--os-username test '
'--os-password test '
'--os-tenant-name test '
'--os-auth-url http://127.0.0.1:5000/ '
'--os-auth-strategy keystone mead-list')
shell.authenticate_user()
shell.run(cmdline.split())
mock_cmgr.assert_called_with(
raise_errors=False, retries=0, timeout=None,
token='', url='', auth_url='http://127.0.0.1:5000/',
tenant_name='test', tenant_id='tenant_id',
username='test', user_id='',
password='test', region_name='',
api_version={'mec-orchestration': '1.0'},
auth_strategy='keystone',
service_type='mec-orchestration',
endpoint_type='publicURL', insecure=False, ca_cert=None,
log_credentials=True, session=auth_session, auth=auth_session.auth)
def test_build_option_parser(self):
apmec_shell = openstack_shell.ApmecShell(DEFAULT_API_VERSION)
result = apmec_shell.build_option_parser('descr', DEFAULT_API_VERSION)
self.assertIsInstance(result, argparse.ArgumentParser)
@mock.patch.object(openstack_shell.ApmecShell, 'run')
def test_main_with_unicode(self, mock_run):
mock_run.return_value = 0
unicode_text = u'\u7f51\u7edc'
argv = ['net-list', unicode_text, unicode_text.encode('utf-8')]
ret = openstack_shell.main(argv=argv)
mock_run.assert_called_once_with([u'net-list', unicode_text,
unicode_text])
self.assertEqual(0, ret)
def test_endpoint_option(self):
shell = openstack_shell.ApmecShell(DEFAULT_API_VERSION)
parser = shell.build_option_parser('descr', DEFAULT_API_VERSION)
# Neither $OS_ENDPOINT_TYPE nor --endpoint-type
namespace = parser.parse_args([])
self.assertEqual('publicURL', namespace.endpoint_type)
# --endpoint-type but not $OS_ENDPOINT_TYPE
namespace = parser.parse_args(['--endpoint-type=admin'])
self.assertEqual('admin', namespace.endpoint_type)
def test_endpoint_environment_variable(self):
fixture = fixtures.EnvironmentVariable("OS_ENDPOINT_TYPE",
"public")
self.useFixture(fixture)
shell = openstack_shell.ApmecShell(DEFAULT_API_VERSION)
parser = shell.build_option_parser('descr', DEFAULT_API_VERSION)
# $OS_ENDPOINT_TYPE but not --endpoint-type
namespace = parser.parse_args([])
self.assertEqual("public", namespace.endpoint_type)
# --endpoint-type and $OS_ENDPOINT_TYPE
namespace = parser.parse_args(['--endpoint-type=admin'])
self.assertEqual('admin', namespace.endpoint_type)

View File

@ -1,82 +0,0 @@
# Copyright (C) 2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
from keystoneclient import session
import mock
import requests
import testtools
from apmecclient import client
from apmecclient.common import clientmanager
from apmecclient.common import exceptions
from apmecclient import shell as openstack_shell
AUTH_TOKEN = 'test_token'
END_URL = 'test_url'
METHOD = 'GET'
URL = 'http://test.test:1234/v1.0/'
CA_CERT = '/tmp/test/path'
DEFAULT_API_VERSION = '1.0'
class TestSSL(testtools.TestCase):
def setUp(self):
super(TestSSL, self).setUp()
self.useFixture(fixtures.EnvironmentVariable('OS_TOKEN', AUTH_TOKEN))
self.useFixture(fixtures.EnvironmentVariable('OS_URL', END_URL))
self.addCleanup(mock.patch.stopall)
def _test_verify_client_manager(self, cacert):
with mock.patch.object(session, 'Session'), \
mock.patch.object(clientmanager, 'ClientManager') as mock_cmgr:
mock_cmgr.return_value = 0
shell = openstack_shell.ApmecShell(DEFAULT_API_VERSION)
shell.options = mock.Mock()
auth_session = shell._get_keystone_session()
shell.run(cacert)
mock_cmgr.assert_called_with(
api_version={'mec-orchestration': '1.0'},
auth=auth_session.auth, auth_strategy='keystone',
auth_url='', ca_cert=CA_CERT, endpoint_type='publicURL',
insecure=False, log_credentials=True, password='',
raise_errors=False, region_name='', retries=0,
service_type='mec-orchestration', session=auth_session,
tenant_id='', tenant_name='', timeout=None,
token='test_token', url='test_url', user_id='', username='')
def test_ca_cert_passed(self):
cacert = ['--os-cacert', CA_CERT]
self._test_verify_client_manager(cacert)
def test_ca_cert_passed_as_env_var(self):
self.useFixture(fixtures.EnvironmentVariable('OS_CACERT', CA_CERT))
self._test_verify_client_manager([])
@mock.patch.object(client.HTTPClient, 'request')
def test_proper_exception_is_raised_when_cert_validation_fails(self,
mock_req):
http = client.HTTPClient(token=AUTH_TOKEN, endpoint_url=END_URL)
mock_req.side_effect = requests.exceptions.SSLError()
self.assertRaises(
exceptions.SslCertificateValidationError,
http._cs_request,
URL, METHOD
)

View File

@ -1,101 +0,0 @@
# Copyright 2014 NEC Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from apmecclient.common import exceptions
from apmecclient.common import validators
class FakeParsedArgs(object):
pass
class ValidatorTest(testtools.TestCase):
def _test_validate_int(self, attr_val, attr_name='attr1',
min_value=1, max_value=10):
obj = FakeParsedArgs()
setattr(obj, attr_name, attr_val)
ret = validators.validate_int_range(obj, attr_name,
min_value, max_value)
# Come here only if there is no exception.
self.assertIsNone(ret)
def _test_validate_int_error(self, attr_val, expected_msg,
attr_name='attr1', expected_exc=None,
min_value=1, max_value=10):
if expected_exc is None:
expected_exc = exceptions.CommandError
e = self.assertRaises(expected_exc,
self._test_validate_int,
attr_val, attr_name, min_value, max_value)
self.assertEqual(expected_msg, str(e))
def test_validate_int_min_max(self):
self._test_validate_int(1)
self._test_validate_int(10)
self._test_validate_int('1')
self._test_validate_int('10')
self._test_validate_int('0x0a')
self._test_validate_int_error(
0, 'attr1 "0" should be an integer [1:10].')
self._test_validate_int_error(
11, 'attr1 "11" should be an integer [1:10].')
self._test_validate_int_error(
'0x10', 'attr1 "0x10" should be an integer [1:10].')
def test_validate_int_min_only(self):
self._test_validate_int(1, max_value=None)
self._test_validate_int(10, max_value=None)
self._test_validate_int(11, max_value=None)
self._test_validate_int_error(
0, 'attr1 "0" should be an integer greater than or equal to 1.',
max_value=None)
def test_validate_int_max_only(self):
self._test_validate_int(0, min_value=None)
self._test_validate_int(1, min_value=None)
self._test_validate_int(10, min_value=None)
self._test_validate_int_error(
11, 'attr1 "11" should be an integer smaller than or equal to 10.',
min_value=None)
def test_validate_int_no_limit(self):
self._test_validate_int(0, min_value=None, max_value=None)
self._test_validate_int(1, min_value=None, max_value=None)
self._test_validate_int(10, min_value=None, max_value=None)
self._test_validate_int(11, min_value=None, max_value=None)
self._test_validate_int_error(
'abc', 'attr1 "abc" should be an integer.',
min_value=None, max_value=None)
def _test_validate_subnet(self, attr_val, attr_name='attr1'):
obj = FakeParsedArgs()
setattr(obj, attr_name, attr_val)
ret = validators.validate_ip_subnet(obj, attr_name)
# Come here only if there is no exception.
self.assertIsNone(ret)
def test_validate_ip_subnet(self):
self._test_validate_subnet('192.168.2.0/24')
self._test_validate_subnet('192.168.2.3/20')
self._test_validate_subnet('192.168.2.1')
e = self.assertRaises(exceptions.CommandError,
self._test_validate_subnet,
'192.168.2.256')
self.assertEqual('attr1 "192.168.2.256" is not a valid CIDR.', str(e))

View File

@ -1,213 +0,0 @@
# Copyright 2014 Intel Corporation
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import mock
from apmecclient import shell
from apmecclient.apmec import v1_0 as apmecV1_0
from apmecclient.apmec.v1_0 import ApmecCommand
from apmecclient.apmec.v1_0.mem import mea
from apmecclient.tests.unit import test_cli10
from apmecclient.tests.unit import test_utils
API_VERSION = "1.0"
FORMAT = 'json'
TOKEN = 'testtoken'
ENDURL = 'localurl'
class CLITestV10VmMEAJSON(test_cli10.CLITestV10Base):
_RESOURCE = 'mea'
_RESOURCES = 'meas'
_MEA_RESOURCES = 'mea_resources'
def setUp(self):
plurals = {'meas': 'mea',
'resources': 'resource'}
super(CLITestV10VmMEAJSON, self).setUp(plurals=plurals)
@mock.patch.object(ApmecCommand, 'get_client')
def _test_create_resource(self, resource, cmd, name, myid, args,
position_names, position_values, mock_get,
tenant_id=None, tags=None, admin_state_up=True,
extra_body=None, **kwargs):
mock_get.return_value = self.client
non_admin_status_resources = ['mead', 'mea']
if (resource in non_admin_status_resources):
body = {resource: {}, }
else:
body = {resource: {'admin_state_up': admin_state_up, }, }
if tenant_id:
body[resource].update({'tenant_id': tenant_id})
if tags:
body[resource].update({'tags': tags})
if extra_body:
body[resource].update(extra_body)
body[resource].update(kwargs)
for i in range(len(position_names)):
body[resource].update({position_names[i]: position_values[i]})
ress = {resource:
{self.id_field: myid}, }
if name:
ress[resource].update({'name': name})
self.client.format = self.format
resstr = self.client.serialize(ress)
# url method body
resource_plural = apmecV1_0._get_resource_plural(resource,
self.client)
path = getattr(self.client, resource_plural + "_path")
# Work around for LP #1217791. XML deserializer called from
# MyComparator does not decodes XML string correctly.
if self.format == 'json':
_body = test_cli10.MyComparator(body, self.client)
else:
_body = self.client.serialize(body)
with mock.patch.object(self.client.httpclient, 'request') as mock_req:
mock_req.return_value = (test_cli10.MyResp(200), resstr)
args.extend(['--request-format', self.format])
args.extend(['--mead-id', 'mead'])
cmd_parser = cmd.get_parser('create_' + resource)
shell.run_command(cmd, cmd_parser, args)
mock_req.assert_called_once_with(
test_cli10.end_url(path, format=self.format), 'POST',
body=_body,
headers=test_utils.ContainsKeyValue('X-Auth-Token', TOKEN))
mock_get.assert_any_call()
def test_create_mea_all_params(self):
cmd = mea.CreateMEA(test_cli10.MyApp(sys.stdout), None)
name = 'my_name'
my_id = 'my-id'
mead_id = 'mead'
vim_id = 'vim_id'
description = 'my-description'
region_name = 'region'
key = 'key'
value = 'value'
args = [
name,
'--mead-id', mead_id,
'--vim-id', vim_id,
'--description', description,
'--vim-region-name', region_name,
'--%s' % key, value]
position_names = [
'name',
'mead_id',
'vim_id',
'description',
'attributes',
]
position_values = [
name,
mead_id,
vim_id,
description,
{},
]
extra_body = {key: value, 'placement_attr': {'region_name':
region_name}}
self._test_create_resource(self._RESOURCE, cmd, name, my_id,
args, position_names, position_values,
extra_body=extra_body)
def test_create_mea_with_mead_id(self):
cmd = mea.CreateMEA(test_cli10.MyApp(sys.stdout), None)
name = 'my_name'
my_id = 'my-id'
mead_id = 'mead'
args = [
name,
'--mead-id', mead_id,
]
position_names = ['name', 'mead_id', 'attributes']
position_values = [name, mead_id, {}]
self._test_create_resource(self._RESOURCE, cmd, name, my_id,
args, position_names, position_values)
def test_create_mea_with_description_param(self):
cmd = mea.CreateMEA(test_cli10.MyApp(sys.stdout), None)
name = 'my_name'
my_id = 'my-id'
mead_id = 'mead'
description = 'my-description'
args = [
name,
'--mead-id', mead_id,
'--description', description,
]
position_names = ['name', 'mead_id', 'description',
'attributes']
position_values = [name, mead_id, description, {}]
self._test_create_resource(self._RESOURCE, cmd, None, my_id,
args, position_names, position_values)
def test_list_meas(self):
cmd = mea.ListMEA(test_cli10.MyApp(sys.stdout), None)
self._test_list_resources(self._RESOURCES, cmd, True)
def test_list_meas_pagenation(self):
cmd = mea.ListMEA(test_cli10.MyApp(sys.stdout), None)
self._test_list_resources(self._RESOURCES, cmd, True)
def test_show_mea_id(self):
cmd = mea.ShowMEA(test_cli10.MyApp(sys.stdout), None)
args = ['--fields', 'id', self.test_id]
self._test_show_resource(self._RESOURCE, cmd, self.test_id, args,
['id'])
def test_show_mea_id_name(self):
cmd = mea.ShowMEA(test_cli10.MyApp(sys.stdout), None)
args = ['--fields', 'id', '--fields', 'name', self.test_id]
self._test_show_resource(self._RESOURCE, cmd, self.test_id,
args, ['id', 'name'])
def test_update_mea(self):
cmd = mea.UpdateMEA(test_cli10.MyApp(sys.stdout), None)
my_id = 'my-id'
key = 'new_key'
value = 'new-value'
self._test_update_resource(self._RESOURCE, cmd, my_id,
[my_id, '--%s' % key, value],
{key: value})
def test_delete_mea(self):
cmd = mea.DeleteMEA(test_cli10.MyApp(sys.stdout), None)
my_id = 'my-id'
args = [my_id]
self._test_delete_resource(self._RESOURCE, cmd, my_id, args)
def test_list_mea_resources(self):
cmd = mea.ListMEAResources(test_cli10.MyApp(sys.stdout), None)
base_args = [self.test_id]
response = [{'name': 'CP11', 'id': 'id1', 'type': 'NeutronPort'},
{'name': 'CP12', 'id': 'id2', 'type': 'NeutronPort'}]
val = self._test_list_sub_resources(self._MEA_RESOURCES, 'resources',
cmd, self.test_id,
response_contents=response,
detail=True, base_args=base_args)
self.assertIn('id1', val)
self.assertIn('NeutronPort', val)
self.assertIn('CP11', val)
def test_multi_delete_mea(self):
cmd = mea.DeleteMEA(test_cli10.MyApp(sys.stdout), None)
mea_ids = 'mea1 mea2 mea3'
args = [mea_ids]
self._test_delete_resource(self._RESOURCE, cmd, mea_ids, args)

View File

@ -1,146 +0,0 @@
# Copyright 2014 Intel Corporation
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import mock_open
from mock import patch
import sys
from apmecclient.common.exceptions import InvalidInput
from apmecclient.apmec.v1_0.mem import mead
from apmecclient.tests.unit import test_cli10
class CLITestV10VmMEADJSON(test_cli10.CLITestV10Base):
_RESOURCE = 'mead'
_RESOURCES = 'meads'
def setUp(self):
plurals = {'meads': 'mead'}
super(CLITestV10VmMEADJSON, self).setUp(plurals=plurals)
@patch("apmecclient.apmec.v1_0.mem.mead.open",
side_effect=mock_open(read_data="mead"),
create=True)
def test_create_mead_all_params(self, mo):
cmd = mead.CreateMEAD(
test_cli10.MyApp(sys.stdout), None)
my_id = 'my-id'
name = 'my-name'
attr_key = 'mead'
attr_val = 'mead'
args = [
name,
'--mead-file', 'mead-file'
]
position_names = ['name']
position_values = [name]
extra_body = {
'service_types': [{'service_type': 'mead'}],
'attributes': {attr_key: attr_val},
}
self._test_create_resource(self._RESOURCE, cmd, None, my_id,
args, position_names, position_values,
extra_body=extra_body)
@patch("apmecclient.apmec.v1_0.mem.mead.open",
side_effect=mock_open(read_data="mead"),
create=True)
def test_create_mead_with_mandatory_params(self, mo):
cmd = mead.CreateMEAD(
test_cli10.MyApp(sys.stdout), None)
name = 'my_name'
my_id = 'my-id'
args = [name, '--mead-file', 'mead-file', ]
position_names = ['name']
position_values = [name]
extra_body = {
'service_types': [{'service_type': 'mead'}],
'attributes': {'mead': 'mead'}
}
self._test_create_resource(self._RESOURCE, cmd, name, my_id,
args, position_names, position_values,
extra_body=extra_body)
@patch("apmecclient.apmec.v1_0.mem.mead.open",
side_effect=mock_open(read_data=""),
create=True)
def test_create_mead_with_empty_file(self, mo):
cmd = mead.CreateMEAD(
test_cli10.MyApp(sys.stdout), None)
name = 'my_name'
my_id = 'my-id'
args = [name, '--mead-file', 'mead-file', ]
position_names = ['name']
position_values = [name]
extra_body = {
'service_types': [{'service_type': 'mead'}],
'attributes': {'mead': 'mead'}
}
err = None
try:
self._test_create_resource(self._RESOURCE, cmd, name, my_id,
args, position_names, position_values,
extra_body=extra_body)
except InvalidInput:
err = True
self.assertEqual(True, err)
def test_list_meads(self):
cmd = mead.ListMEAD(test_cli10.MyApp(sys.stdout), None)
self._test_list_resources(self._RESOURCES, cmd, True,
template_source='onboarded')
def test_list_inline_meads(self):
cmd = mead.ListMEAD(test_cli10.MyApp(sys.stdout), None)
self._test_list_resources(self._RESOURCES, cmd, True,
template_source='inline')
def test_list_all_meads(self):
cmd = mead.ListMEAD(test_cli10.MyApp(sys.stdout), None)
self._test_list_resources(self._RESOURCES, cmd, True,
template_source='all')
def test_list_meads_pagenation(self):
cmd = mead.ListMEAD(test_cli10.MyApp(sys.stdout), None)
print(cmd)
self._test_list_resources(self._RESOURCES, cmd, True,
template_source='onboarded')
def test_show_mead_id(self):
cmd = mead.ShowMEAD(test_cli10.MyApp(sys.stdout), None)
args = ['--fields', 'id', self.test_id]
self._test_show_resource(self._RESOURCE, cmd, self.test_id, args,
['id'])
def test_show_mead_id_name(self):
cmd = mead.ShowMEAD(test_cli10.MyApp(sys.stdout), None)
args = ['--fields', 'id', '--fields', 'name', self.test_id]
self._test_show_resource(self._RESOURCE, cmd, self.test_id,
args, ['id', 'name'])
def test_delete_mead(self):
cmd = mead.DeleteMEAD(
test_cli10.MyApp(sys.stdout), None)
my_id = 'my-id'
args = [my_id]
self._test_delete_resource(self._RESOURCE, cmd, my_id, args)
def test_multi_delete_mead(self):
cmd = mead.DeleteMEAD(
test_cli10.MyApp(sys.stdout), None)
mead_ids = 'my-id1 my-id2 my-id3'
args = [mead_ids]
self._test_delete_resource(self._RESOURCE, cmd, mead_ids, args)

View File

@ -1,69 +0,0 @@
# Copyright 2014 Intel Corporation
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from apmecclient.apmec.v1_0.events import events
from apmecclient.tests.unit import test_cli10
API_VERSION = "1.0"
FORMAT = 'json'
TOKEN = 'testtoken'
ENDURL = 'localurl'
class CLITestV10EventJSON(test_cli10.CLITestV10Base):
_EVT_RESOURCE = 'event'
_EVT_RESOURCES = _EVT_RESOURCE + 's'
_MEA_EVT_RESOURCE = "mea_event"
_MEA_EVT_RESOURCES = _MEA_EVT_RESOURCE + 's'
_MEAD_EVT_RESOURCE = "mead_event"
_MEAD_EVT_RESOURCES = _MEAD_EVT_RESOURCE + 's'
_VIM_EVT_RESOURCE = "vim_event"
_VIM_EVT_RESOURCES = _VIM_EVT_RESOURCE + 's'
def setUp(self):
plurals = {'events': 'event', 'mea_events': 'mea_event',
'mead_events': 'mead_event', 'vim_events': 'vim_event'}
super(CLITestV10EventJSON, self).setUp(plurals=plurals)
def test_list_events(self):
cmd = events.ListResourceEvents(test_cli10.MyApp(sys.stdout), None)
self._test_list_resources(self._EVT_RESOURCES, cmd, True)
def test_show_event_id(self):
cmd = events.ShowEvent(test_cli10.MyApp(sys.stdout), None)
args = ['--fields', 'id', self.test_id]
self._test_show_resource(self._EVT_RESOURCE, cmd, self.test_id, args,
['id'])
def notest_list_mea_events(self):
# TODO(vishwanathj): Need to enhance _test_list_resources()
# for supporting filters to get this test working
cmd = events.ListMEAEvents(test_cli10.MyApp(sys.stdout), None)
self._test_list_resources(self._MEA_EVT_RESOURCES, cmd, True)
def notest_list_mead_events(self):
# TODO(vishwanathj): Need to enhance _test_list_resources()
# for supporting filters to get this test working
cmd = events.ListMEADEvents(test_cli10.MyApp(sys.stdout), None)
self._test_list_resources(self._MEAD_EVT_RESOURCES, cmd, True)
def notest_list_vim_events(self):
# TODO(vishwanathj): Need to enhance _test_list_resources()
# for supporting filters to get this test working
cmd = events.ListVIMEvents(test_cli10.MyApp(sys.stdout), None)
self._test_list_resources(self._VIM_EVT_RESOURCES, cmd, True)

View File

@ -16,9 +16,10 @@
import sys
from apmecclient.apmec.v1_0.meo import vim
from apmecclient.common import exceptions
from apmecclient.common import utils
from apmecclient.apmec.v1_0.meo import vim
from apmecclient.tests.unit import test_cli10
API_VERSION = "1.0"

View File

@ -17,9 +17,10 @@
from mock import sentinel
import testtools
from apmecclient.common import exceptions
from apmecclient.apmec.v1_0.meo import vim_utils
from apmecclient.common import exceptions
class TestVIMUtils(testtools.TestCase):

View File

@ -71,19 +71,19 @@ def exception_handler_v10(status_code, error_content):
status_code=status_code, message=error_message)
else:
raise exceptions.ApmecClientException(status_code=status_code,
message=error_dict)
message=error_dict)
else:
message = None
if isinstance(error_content, dict):
message = error_content.get('message')
if message:
raise exceptions.ApmecClientException(status_code=status_code,
message=message)
message=message)
# If we end up here the exception was not a apmec error
msg = "%s-%s" % (status_code, error_content)
raise exceptions.ApmecClientException(status_code=status_code,
message=msg)
message=msg)
class APIParamsCall(object):
@ -344,6 +344,12 @@ class Client(ClientBase):
vims_path = '/vims'
vim_path = '/vims/%s'
mecads_path = '/mecads'
mecad_path = '/mecads/%s'
mecas_path = '/mecas'
meca_path = '/mecas/%s'
events_path = '/events'
event_path = '/events/%s'
@ -368,6 +374,7 @@ class Client(ClientBase):
_MEAD = "mead"
_MESD = "mesd"
_MECAD = "mecad"
@APIParamsCall
def list_meads(self, retrieve_all=True, **_params):
@ -496,12 +503,12 @@ class Client(ClientBase):
@APIParamsCall
def list_mesds(self, retrieve_all=True, **_params):
mesds_dict = self.list(self._MESD + 's',
self.mesds_path,
retrieve_all,
**_params)
self.mesds_path,
retrieve_all,
**_params)
for mesd in mesds_dict['mesds']:
if 'description' in mesd.keys() and \
len(mesd['description']) > DEFAULT_DESC_LENGTH:
len(mesd['description']) > DEFAULT_DESC_LENGTH:
mesd['description'] = mesd['description'][:DEFAULT_DESC_LENGTH]
mesd['description'] += '...'
return mesds_dict
@ -525,9 +532,9 @@ class Client(ClientBase):
for mes in mess['mess']:
error_reason = mes.get('error_reason', None)
if error_reason and \
len(error_reason) > DEFAULT_ERROR_REASON_LENGTH:
mes['error_reason'] = error_reason[
:DEFAULT_ERROR_REASON_LENGTH]
len(error_reason) > DEFAULT_ERROR_REASON_LENGTH:
mes['error_reason'] =\
error_reason[:DEFAULT_ERROR_REASON_LENGTH]
mes['error_reason'] += '...'
return mess
@ -541,4 +548,63 @@ class Client(ClientBase):
@APIParamsCall
def delete_mes(self, mes):
return self.delete(self.mes_path % mes)
return self.delete(self.mes_path % mes)
@APIParamsCall
def update_mes(self, mes, body):
return self.put(self.mes_path % mes, body=body)
@APIParamsCall
def list_mecads(self, retrieve_all=True, **_params):
mecads_dict = self.list(self._MECAD + 's',
self.mecads_path,
retrieve_all,
**_params)
for mecad in mecads_dict['mecads']:
if 'description' in mecad.keys() and \
len(mecad['description']) > DEFAULT_DESC_LENGTH:
mecad['description'] =\
mecad['description'][:DEFAULT_DESC_LENGTH]
mecad['description'] += '...'
return mecads_dict
@APIParamsCall
def show_mecad(self, mecad, **_params):
return self.get(self.mecad_path % mecad,
params=_params)
@APIParamsCall
def create_mecad(self, body):
return self.post(self.mecads_path, body)
@APIParamsCall
def delete_mecad(self, mecad):
return self.delete(self.mecad_path % mecad)
@APIParamsCall
def list_mecas(self, retrieve_all=True, **_params):
mecas = self.list('mecas', self.mecas_path, retrieve_all, **_params)
for meca in mecas['mecas']:
error_reason = meca.get('error_reason', None)
if error_reason and \
len(error_reason) > DEFAULT_ERROR_REASON_LENGTH:
meca['error_reason'] =\
error_reason[:DEFAULT_ERROR_REASON_LENGTH]
meca['error_reason'] += '...'
return mecas
@APIParamsCall
def show_meca(self, meca, **_params):
return self.get(self.meca_path % meca, params=_params)
@APIParamsCall
def create_meca(self, body):
return self.post(self.mecas_path, body=body)
@APIParamsCall
def delete_meca(self, meca):
return self.delete(self.meca_path % meca)
@APIParamsCall
def update_meca(self, meca, body):
return self.put(self.meca_path % meca, body=body)

View File

@ -54,6 +54,7 @@ rfc3986==0.3.1
simplejson==3.5.1
six==1.10.0
stevedore==1.20.0
stestr==2.0.0
testrepository==0.0.18
testtools==2.2.0
traceback2==1.4.0

View File

@ -13,7 +13,7 @@ testrepository>=0.0.18 # Apache-2.0/BSD
testtools>=1.4.0 # MIT
oslosphinx>=4.7.0 # Apache-2.0
openstackdocstheme>=1.17.0 # Apache-2.0
stestr>=2.0.0 # Apache-2.0
# releasenotes
reno>=2.5.0 # Apache-2.0
mock>=2.0.0 # BSD

19
tox.ini
View File

@ -1,5 +1,5 @@
[tox]
envlist = py35,py27,pypy,pep8
envlist = py35,py27,pep8
minversion = 2.0
skipsdist = True
@ -11,10 +11,12 @@ setenv = VIRTUAL_ENV={envdir}
LANGUAGE=en_US:en
LC_ALL=C
usedevelop = True
install_command = {toxinidir}/tools/tox_install.sh {env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt} {opts} {packages}
deps = -r{toxinidir}/requirements.txt
install_command = pip install {opts} {packages}
deps =
-c{env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt?h=stable/rocky}
-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py testr --testr-args='{posargs}'
commands = stestr run --slowest {posargs}
[testenv:pep8]
commands = flake8
@ -27,8 +29,13 @@ commands = {posargs}
commands = sphinx-build -a -E -W -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html
[testenv:cover]
commands = python setup.py testr --coverage --testr-args='{posargs}'
setenv =
PYTHON=coverage run --source apmecclient --parallel-mode
commands =
stestr run {posargs}
coverage combine
coverage html -d cover
coverage xml -o cover/coverage.xml
[flake8]
# E125 continuation line does not distinguish itself from next logical line
ignore = E125