477 lines
18 KiB
Python
477 lines
18 KiB
Python
# Copyright (C) 2022 Fujitsu
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import ddt
|
|
import os
|
|
import sys
|
|
|
|
from io import StringIO
|
|
from oslo_utils.fixture import uuidsentinel
|
|
from unittest import mock
|
|
|
|
from tackerclient.common import exceptions
|
|
from tackerclient.osc import utils as tacker_osc_utils
|
|
from tackerclient.osc.v2.vnfpm import vnfpm_job
|
|
from tackerclient.tests.unit.osc import base
|
|
from tackerclient.tests.unit.osc.v1.fixture_data import client
|
|
from tackerclient.tests.unit.osc.v2 import vnfpm_job_fakes
|
|
|
|
|
|
class TestVnfPmJob(base.FixturedTestCase):
|
|
client_fixture_class = client.ClientFixture
|
|
|
|
def setUp(self):
|
|
super(TestVnfPmJob, self).setUp()
|
|
self.url = client.TACKER_URL
|
|
self.header = {'content-type': 'application/json'}
|
|
self.app = mock.Mock()
|
|
self.app_args = mock.Mock()
|
|
self.client_manager = self.cs
|
|
self.app.client_manager.tackerclient = self.client_manager
|
|
|
|
|
|
def _get_columns_vnfpm_job(action=None):
|
|
if action == 'update':
|
|
columns = ['Callback Uri']
|
|
else:
|
|
columns = ['ID', 'Object Type', 'Object Instance Ids',
|
|
'Sub Object Instance Ids', 'Criteria', 'Callback Uri',
|
|
'Reports', 'Links']
|
|
if action == 'list':
|
|
columns = [
|
|
ele for ele in columns if ele not in [
|
|
'Criteria', 'Sub Object Instance Ids', 'Reports', 'Links'
|
|
]
|
|
]
|
|
return columns
|
|
|
|
|
|
@ddt.ddt
|
|
class TestCreateVnfPmJob(TestVnfPmJob):
|
|
|
|
def setUp(self):
|
|
super(TestCreateVnfPmJob, self).setUp()
|
|
self.create_vnf_pm_job = vnfpm_job.CreateVnfPmJob(
|
|
self.app, self.app_args, cmd_name='vnfpm job create')
|
|
|
|
def test_create_no_args(self):
|
|
self.assertRaises(base.ParserException, self.check_parser,
|
|
self.create_vnf_pm_job, [], [])
|
|
|
|
@ddt.unpack
|
|
def test_take_action(self):
|
|
param_file = ("./tackerclient/osc/v2/vnfpm/samples/"
|
|
"create_vnf_pm_job_param_sample.json")
|
|
|
|
arg_list = [param_file]
|
|
verify_list = [('request_file', param_file)]
|
|
|
|
parsed_args = self.check_parser(self.create_vnf_pm_job, arg_list,
|
|
verify_list)
|
|
|
|
json = vnfpm_job_fakes.vnf_pm_job_response()
|
|
self.requests_mock.register_uri(
|
|
'POST', os.path.join(self.url, 'vnfpm/v2/pm_jobs'),
|
|
json=json, headers=self.header)
|
|
|
|
actual_columns, data = (
|
|
self.create_vnf_pm_job.take_action(parsed_args))
|
|
self.assertCountEqual(_get_columns_vnfpm_job(),
|
|
actual_columns)
|
|
|
|
_, attributes = vnfpm_job._get_columns(json)
|
|
expected_data = vnfpm_job_fakes.get_vnfpm_job_data(
|
|
json, columns=attributes)
|
|
self.assertListItemsEqual(expected_data, data)
|
|
|
|
|
|
@ddt.ddt
|
|
class TestListVnfPmJob(TestVnfPmJob):
|
|
|
|
def setUp(self):
|
|
super(TestListVnfPmJob, self).setUp()
|
|
self.list_vnf_pm_jobs = vnfpm_job.ListVnfPmJob(
|
|
self.app, self.app_args, cmd_name='vnfpm job list')
|
|
self._vnf_pm_jobs = self._get_vnf_pm_jobs()
|
|
|
|
def _get_vnf_pm_jobs(self):
|
|
return vnfpm_job_fakes.create_vnf_pm_jobs(count=3)
|
|
|
|
def get_list_columns(self, all_fields=False, exclude_fields=None,
|
|
extra_fields=None, exclude_default=False):
|
|
|
|
columns = ['Id', 'Object Type', 'Links']
|
|
complex_columns = [
|
|
'Object Instance Ids',
|
|
'Sub Object Instance Ids',
|
|
'Criteria',
|
|
'Reports'
|
|
]
|
|
simple_columns = ['Callback Uri']
|
|
|
|
if extra_fields:
|
|
columns.extend(extra_fields)
|
|
|
|
if exclude_fields:
|
|
columns.extend([field for field in complex_columns
|
|
if field not in exclude_fields])
|
|
if all_fields:
|
|
columns.extend(complex_columns)
|
|
columns.extend(simple_columns)
|
|
|
|
if exclude_default:
|
|
columns.extend(simple_columns)
|
|
|
|
return columns
|
|
|
|
def _get_mock_response_for_list_vnf_pm_jobs(
|
|
self, filter_attribute, json=None):
|
|
self.requests_mock.register_uri(
|
|
'GET', self.url + '/vnfpm/v2/pm_jobs?' + filter_attribute,
|
|
json=json if json else self._get_vnf_pm_jobs(),
|
|
headers=self.header)
|
|
|
|
def test_take_action_default_fields(self):
|
|
parsed_args = self.check_parser(self.list_vnf_pm_jobs, [], [])
|
|
self.requests_mock.register_uri(
|
|
'GET', self.url + '/vnfpm/v2/pm_jobs',
|
|
json=self._vnf_pm_jobs, headers=self.header)
|
|
actual_columns, data = self.list_vnf_pm_jobs.take_action(parsed_args)
|
|
expected_data = []
|
|
_, columns = tacker_osc_utils.get_column_definitions(
|
|
self.list_vnf_pm_jobs.get_attributes(), long_listing=True)
|
|
|
|
for vnf_pm_job_obj in self._vnf_pm_jobs['vnf_pm_jobs']:
|
|
expected_data.append(vnfpm_job_fakes.get_vnfpm_job_data(
|
|
vnf_pm_job_obj, columns=columns))
|
|
self.assertCountEqual(self.get_list_columns(), actual_columns)
|
|
self.assertListItemsEqual(expected_data, list(data))
|
|
|
|
@ddt.data('all_fields', 'exclude_default')
|
|
def test_take_action(self, arg):
|
|
parsed_args = self.check_parser(
|
|
self.list_vnf_pm_jobs,
|
|
["--" + arg, "--filter", '(eq,objectType,VNFC)'],
|
|
[(arg, True), ('filter', '(eq,objectType,VNFC)')])
|
|
vnf_pm_jobs = self._get_vnf_pm_jobs()
|
|
self._get_mock_response_for_list_vnf_pm_jobs(
|
|
'filter=(eq,objectType,VNFC)&' + arg, json=vnf_pm_jobs)
|
|
|
|
actual_columns, data = self.list_vnf_pm_jobs.take_action(parsed_args)
|
|
expected_data = []
|
|
kwargs = {arg: True}
|
|
_, columns = tacker_osc_utils.get_column_definitions(
|
|
self.list_vnf_pm_jobs.get_attributes(**kwargs), long_listing=True)
|
|
|
|
for vnf_pm_job_obj in vnf_pm_jobs['vnf_pm_jobs']:
|
|
expected_data.append(vnfpm_job_fakes.get_vnfpm_job_data(
|
|
vnf_pm_job_obj, columns=columns))
|
|
|
|
self.assertCountEqual(self.get_list_columns(**kwargs), actual_columns)
|
|
self.assertListItemsEqual(expected_data, list(data))
|
|
|
|
def test_take_action_with_exclude_fields(self):
|
|
parsed_args = self.check_parser(
|
|
self.list_vnf_pm_jobs,
|
|
["--exclude_fields", 'objectInstanceIds,criteria',
|
|
"--filter", '(eq,objectType,VNFC)'],
|
|
[('exclude_fields', 'objectInstanceIds,criteria'),
|
|
('filter', '(eq,objectType,VNFC)')])
|
|
vnf_pm_jobs = self._get_vnf_pm_jobs()
|
|
updated_vnf_pm_jobs = {'vnf_pm_jobs': []}
|
|
for vnf_pm_job_obj in vnf_pm_jobs['vnf_pm_jobs']:
|
|
vnf_pm_job_obj.pop('objectInstanceIds')
|
|
vnf_pm_job_obj.pop('criteria')
|
|
updated_vnf_pm_jobs['vnf_pm_jobs'].append(vnf_pm_job_obj)
|
|
self._get_mock_response_for_list_vnf_pm_jobs(
|
|
'filter=(eq,objectType,VNFC)&'
|
|
'exclude_fields=objectInstanceIds,criteria',
|
|
json=updated_vnf_pm_jobs)
|
|
|
|
actual_columns, data = self.list_vnf_pm_jobs.take_action(parsed_args)
|
|
expected_data = []
|
|
_, columns = tacker_osc_utils.get_column_definitions(
|
|
self.list_vnf_pm_jobs.get_attributes(
|
|
exclude_fields=['objectInstanceIds', 'criteria']),
|
|
long_listing=True)
|
|
|
|
for updated_vnf_pm_obj in updated_vnf_pm_jobs['vnf_pm_jobs']:
|
|
expected_data.append(vnfpm_job_fakes.get_vnfpm_job_data(
|
|
updated_vnf_pm_obj, columns=columns))
|
|
expected_columns = self.get_list_columns(
|
|
exclude_fields=['Object Instance Ids', 'Criteria'])
|
|
self.assertCountEqual(expected_columns, actual_columns)
|
|
self.assertListItemsEqual(expected_data, list(data))
|
|
|
|
@ddt.data((['--all_fields', '--fields', 'objectInstanceIds'],
|
|
[('all_fields', True), ('fields', 'objectInstanceIds')]),
|
|
(['--all_fields', '--exclude_fields', 'criteria'],
|
|
[('all_fields', True), ('exclude_fields', 'criteria')]),
|
|
(['--fields', 'objectInstanceIds',
|
|
'--exclude_fields', 'criteria'],
|
|
[('fields', 'objectInstanceIds'),
|
|
('exclude_fields', 'criteria')]))
|
|
@ddt.unpack
|
|
def test_take_action_with_invalid_combination(self, arglist, verifylist):
|
|
self.assertRaises(base.ParserException, self.check_parser,
|
|
self.list_vnf_pm_jobs, arglist, verifylist)
|
|
|
|
def test_take_action_with_valid_combination(self):
|
|
parsed_args = self.check_parser(
|
|
self.list_vnf_pm_jobs,
|
|
[
|
|
"--fields", 'subObjectInstanceIds,criteria',
|
|
"--exclude_default"
|
|
],
|
|
[
|
|
('fields', 'subObjectInstanceIds,criteria'),
|
|
('exclude_default', True)
|
|
])
|
|
vnf_pm_jobs = self._get_vnf_pm_jobs()
|
|
|
|
updated_vnf_pm_jobs = {'vnf_pm_jobs': []}
|
|
for vnf_pm_job_obj in vnf_pm_jobs['vnf_pm_jobs']:
|
|
# vnf_pm_job_obj.pop('userDefinedData')
|
|
updated_vnf_pm_jobs['vnf_pm_jobs'].append(vnf_pm_job_obj)
|
|
|
|
self._get_mock_response_for_list_vnf_pm_jobs(
|
|
'exclude_default&fields=subObjectInstanceIds,criteria',
|
|
json=updated_vnf_pm_jobs)
|
|
|
|
actual_columns, data = self.list_vnf_pm_jobs.take_action(parsed_args)
|
|
expected_data = []
|
|
_, columns = tacker_osc_utils.get_column_definitions(
|
|
self.list_vnf_pm_jobs.get_attributes(
|
|
extra_fields=['subObjectInstanceIds', 'criteria'],
|
|
exclude_default=True),
|
|
long_listing=True)
|
|
|
|
for updated_vnf_pm_job_obj in updated_vnf_pm_jobs['vnf_pm_jobs']:
|
|
expected_data.append(vnfpm_job_fakes.get_vnfpm_job_data(
|
|
updated_vnf_pm_job_obj, columns=columns))
|
|
|
|
expected_columns = self.get_list_columns(
|
|
extra_fields=['Sub Object Instance Ids', 'Criteria'],
|
|
exclude_default=True)
|
|
self.assertCountEqual(expected_columns, actual_columns)
|
|
self.assertListItemsEqual(expected_data, list(data))
|
|
|
|
|
|
class TestShowVnfPmJob(TestVnfPmJob):
|
|
|
|
def setUp(self):
|
|
super(TestShowVnfPmJob, self).setUp()
|
|
self.show_vnf_pm_jobs = vnfpm_job.ShowVnfPmJob(
|
|
self.app, self.app_args, cmd_name='vnfpm job show')
|
|
|
|
def test_take_action(self):
|
|
"""Test of take_action()"""
|
|
vnfpm_job_obj = vnfpm_job_fakes.vnf_pm_job_response()
|
|
|
|
arg_list = [vnfpm_job_obj['id']]
|
|
verify_list = [('vnf_pm_job_id', vnfpm_job_obj['id'])]
|
|
|
|
# command param
|
|
parsed_args = self.check_parser(
|
|
self.show_vnf_pm_jobs, arg_list, verify_list)
|
|
url = os.path.join(
|
|
self.url, 'vnfpm/v2/pm_jobs', vnfpm_job_obj['id'])
|
|
|
|
self.requests_mock.register_uri(
|
|
'GET', url, headers=self.header, json=vnfpm_job_obj)
|
|
|
|
columns, data = (self.show_vnf_pm_jobs.take_action(parsed_args))
|
|
|
|
self.assertCountEqual(_get_columns_vnfpm_job('show'), columns)
|
|
|
|
_, attributes = vnfpm_job._get_columns(vnfpm_job_obj)
|
|
self.assertListItemsEqual(
|
|
vnfpm_job_fakes.get_vnfpm_job_data(
|
|
vnfpm_job_obj, columns=attributes), data)
|
|
|
|
def test_take_action_vnf_pm_job_id_not_found(self):
|
|
"""Test if vnf-pm-job-id does not find."""
|
|
arg_list = [uuidsentinel.vnf_pm_job_id]
|
|
verify_list = [('vnf_pm_job_id', uuidsentinel.vnf_pm_job_id)]
|
|
|
|
# command param
|
|
parsed_args = self.check_parser(
|
|
self.show_vnf_pm_jobs, arg_list, verify_list)
|
|
|
|
url = os.path.join(
|
|
self.url, 'vnfpm/v2/pm_jobs', uuidsentinel.vnf_pm_job_id)
|
|
self.requests_mock.register_uri(
|
|
'GET', url, headers=self.header, status_code=404, json={})
|
|
|
|
self.assertRaises(exceptions.TackerClientException,
|
|
self.show_vnf_pm_jobs.take_action,
|
|
parsed_args)
|
|
|
|
def test_take_action_internal_server_error(self):
|
|
"""Test for internal server error."""
|
|
arg_list = [uuidsentinel.vnf_pm_job_id]
|
|
verify_list = [('vnf_pm_job_id', uuidsentinel.vnf_pm_job_id)]
|
|
|
|
# command param
|
|
parsed_args = self.check_parser(
|
|
self.show_vnf_pm_jobs, arg_list, verify_list)
|
|
|
|
url = os.path.join(
|
|
self.url, 'vnfpm/v2/pm_jobs', uuidsentinel.vnf_pm_job_id)
|
|
self.requests_mock.register_uri(
|
|
'GET', url, headers=self.header, status_code=500, json={})
|
|
|
|
self.assertRaises(exceptions.TackerClientException,
|
|
self.show_vnf_pm_jobs.take_action,
|
|
parsed_args)
|
|
|
|
|
|
@ddt.ddt
|
|
class TestUpdateVnfPmJob(TestVnfPmJob):
|
|
|
|
def setUp(self):
|
|
super(TestUpdateVnfPmJob, self).setUp()
|
|
self.update_vnf_pm_job = vnfpm_job.UpdateVnfPmJob(
|
|
self.app, self.app_args, cmd_name='vnfpm job update')
|
|
|
|
def test_take_action(self):
|
|
"""Test of take_action()"""
|
|
|
|
param_file = ("./tackerclient/osc/v2/vnfpm/samples/"
|
|
"update_vnf_pm_job_param_sample.json")
|
|
arg_list = [uuidsentinel.vnf_pm_job_id, param_file]
|
|
verify_list = [
|
|
('vnf_pm_job_id', uuidsentinel.vnf_pm_job_id),
|
|
('request_file', param_file)
|
|
]
|
|
vnfpm_job_obj = vnfpm_job_fakes.vnf_pm_job_response(
|
|
None, 'update')
|
|
|
|
# command param
|
|
parsed_args = self.check_parser(
|
|
self.update_vnf_pm_job, arg_list, verify_list)
|
|
url = os.path.join(
|
|
self.url, 'vnfpm/v2/pm_jobs', uuidsentinel.vnf_pm_job_id)
|
|
self.requests_mock.register_uri(
|
|
'PATCH', url, headers=self.header, json=vnfpm_job_obj)
|
|
|
|
actual_columns, data = (
|
|
self.update_vnf_pm_job.take_action(parsed_args))
|
|
expected_columns = _get_columns_vnfpm_job(action='update')
|
|
self.assertCountEqual(expected_columns, actual_columns)
|
|
|
|
_, columns = vnfpm_job._get_columns(
|
|
vnfpm_job_obj, action='update')
|
|
expected_data = vnfpm_job_fakes.get_vnfpm_job_data(
|
|
vnfpm_job_obj, columns=columns)
|
|
self.assertEqual(expected_data, data)
|
|
|
|
def test_take_action_vnf_pm_job_id_not_found(self):
|
|
"""Test if vnf-pm-job-id does not find"""
|
|
|
|
param_file = ("./tackerclient/osc/v2/vnfpm/samples/"
|
|
"update_vnf_pm_job_param_sample.json")
|
|
arg_list = [uuidsentinel.vnf_pm_job_id, param_file]
|
|
verify_list = [
|
|
('vnf_pm_job_id', uuidsentinel.vnf_pm_job_id),
|
|
('request_file', param_file)
|
|
]
|
|
|
|
# command param
|
|
parsed_args = self.check_parser(
|
|
self.update_vnf_pm_job, arg_list, verify_list)
|
|
url = os.path.join(
|
|
self.url, 'vnfpm/v2/pm_jobs', uuidsentinel.vnf_pm_job_id)
|
|
self.requests_mock.register_uri(
|
|
'PATCH', url, headers=self.header, status_code=404, json={})
|
|
self.assertRaises(exceptions.TackerClientException,
|
|
self.update_vnf_pm_job.take_action,
|
|
parsed_args)
|
|
|
|
|
|
class TestDeleteVnfPmJob(TestVnfPmJob):
|
|
|
|
def setUp(self):
|
|
super(TestDeleteVnfPmJob, self).setUp()
|
|
self.delete_vnf_pm_job = vnfpm_job.DeleteVnfPmJob(
|
|
self.app, self.app_args, cmd_name='vnfpm job delete')
|
|
|
|
# Vnf Fm job to delete
|
|
self.vnf_pm_jobs = vnfpm_job_fakes.create_vnf_pm_jobs(
|
|
count=3)['vnf_pm_jobs']
|
|
|
|
def _mock_request_url_for_delete(self, index):
|
|
url = os.path.join(self.url, 'vnfpm/v2/pm_jobs',
|
|
self.vnf_pm_jobs[index]['id'])
|
|
|
|
self.requests_mock.register_uri('DELETE', url,
|
|
headers=self.header, json={})
|
|
|
|
def test_delete_one_vnf_pm_job(self):
|
|
arg_list = [self.vnf_pm_jobs[0]['id']]
|
|
verify_list = [('vnf_pm_job_id',
|
|
[self.vnf_pm_jobs[0]['id']])]
|
|
|
|
parsed_args = self.check_parser(self.delete_vnf_pm_job, arg_list,
|
|
verify_list)
|
|
|
|
self._mock_request_url_for_delete(0)
|
|
sys.stdout = buffer = StringIO()
|
|
result = self.delete_vnf_pm_job.take_action(parsed_args)
|
|
self.assertIsNone(result)
|
|
self.assertEqual(
|
|
(f"VNF PM job '{self.vnf_pm_jobs[0]['id']}' "
|
|
f"deleted successfully"), buffer.getvalue().strip())
|
|
|
|
def test_delete_multiple_vnf_pm_job(self):
|
|
arg_list = []
|
|
for obj in self.vnf_pm_jobs:
|
|
arg_list.append(obj['id'])
|
|
verify_list = [('vnf_pm_job_id', arg_list)]
|
|
parsed_args = self.check_parser(self.delete_vnf_pm_job, arg_list,
|
|
verify_list)
|
|
for i in range(0, 3):
|
|
self._mock_request_url_for_delete(i)
|
|
sys.stdout = buffer = StringIO()
|
|
result = self.delete_vnf_pm_job.take_action(parsed_args)
|
|
self.assertIsNone(result)
|
|
self.assertEqual('All specified VNF PM jobs are deleted '
|
|
'successfully', buffer.getvalue().strip())
|
|
|
|
def test_delete_multiple_vnf_pm_job_exception(self):
|
|
arg_list = [
|
|
self.vnf_pm_jobs[0]['id'],
|
|
'xxxx-yyyy-zzzz',
|
|
self.vnf_pm_jobs[1]['id'],
|
|
]
|
|
verify_list = [('vnf_pm_job_id', arg_list)]
|
|
parsed_args = self.check_parser(self.delete_vnf_pm_job,
|
|
arg_list, verify_list)
|
|
|
|
self._mock_request_url_for_delete(0)
|
|
|
|
url = os.path.join(self.url, 'vnfpm/v2/jobs',
|
|
'xxxx-yyyy-zzzz')
|
|
self.requests_mock.register_uri(
|
|
'GET', url, exc=exceptions.ConnectionFailed)
|
|
|
|
self._mock_request_url_for_delete(1)
|
|
exception = self.assertRaises(exceptions.CommandError,
|
|
self.delete_vnf_pm_job.take_action,
|
|
parsed_args)
|
|
|
|
self.assertEqual('Failed to delete 1 of 3 VNF PM jobs.',
|
|
exception.message)
|