Use requests-mock for old tests

In order to be able to change HTTP client easily
tests must not mock requests module themselves but
use requests-mock instead.

Since ols CLI is going to be removed there is no need in
creating a common base test class. This patch separates
base classes for old and new tests and utilizes requests-mock
for mocking HTTP client properly.

Change-Id: Ia0996cb07e6622306f7b9e2350d9e56344004e61
Related-bug: #1459964
This commit is contained in:
Roman Prykhodchenko 2015-10-02 09:17:21 +02:00
parent 56fbd6bad7
commit 7b387ec229
32 changed files with 497 additions and 542 deletions

View File

@ -1,13 +0,0 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -11,13 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from fuelclient.objects import Release
try:
from unittest.case import TestCase
except ImportError:
# Runing unit-tests in production environment
from unittest2.case import TestCase
import logging
import os
@ -26,11 +19,13 @@ import subprocess
import sys
import tempfile
try:
from unittest.case import TestCase
except ImportError:
# Runing unit-tests in production environment all
from unittest2.case import TestCase
import mock
from six import StringIO
from fuelclient.cli.parser import main
from fuelclient.objects import Release
logging.basicConfig(stream=sys.stderr)
@ -38,21 +33,7 @@ log = logging.getLogger("CliTest.ExecutionLog")
log.setLevel(logging.DEBUG)
class FakeFile(StringIO):
"""It's a fake file which returns StringIO
when file opens with 'with' statement.
NOTE(eli): We cannot use mock_open from mock library
here, because it hangs when we use 'with' statement,
and when we want to read file by chunks.
"""
def __enter__(self):
return self
def __exit__(self, *args):
pass
class CliExectutionResult:
class CliExectutionResult(object):
def __init__(self, process_handle, out, err):
self.return_code = process_handle.returncode
self.stdout = out
@ -67,47 +48,16 @@ class CliExectutionResult:
return self.return_code == 0
class UnitTestCase(TestCase):
"""Base test class which does not require nailgun server to run."""
def setUp(self):
"""Mocks keystone authentication."""
self.auth_required_patcher = mock.patch(
'fuelclient.client.Client.auth_required',
new_callable=mock.PropertyMock
)
self.auth_required_mock = self.auth_required_patcher.start()
self.auth_required_mock.return_value = False
super(UnitTestCase, self).setUp()
def tearDown(self):
super(UnitTestCase, self).tearDown()
self.auth_required_patcher.stop()
def execute(self, command):
return main(command)
def mock_open(self, text, filename='some.file'):
"""Mocks builtin open function.
Usage example:
with mock.patch('__builtin__.open', self.mock_open('file content')):
# call mocked code
"""
fileobj = FakeFile(text)
setattr(fileobj, 'name', filename)
return mock.MagicMock(return_value=fileobj)
class BaseTestCase(UnitTestCase):
class BaseTestCase(TestCase):
nailgun_root = os.environ.get('NAILGUN_ROOT', '/tmp/fuel_web/nailgun')
def setUp(self):
super(BaseTestCase, self).setUp()
self.reload_nailgun_server()
self.temp_directory = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.temp_directory)
self.addCleanup(shutil.rmtree, self.temp_directory)
@staticmethod
def run_command(*args, **kwargs):

View File

@ -17,7 +17,7 @@
import os
import tempfile
from fuelclient.tests import base
from fuelclient.tests.functional import base
class TestHandlers(base.BaseTestCase):

View File

@ -16,11 +16,10 @@
import json
import mock
import requests_mock
import six
import yaml
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
YAML_TEMPLATE = """adv_net_template:
default:
@ -109,14 +108,8 @@ class TestNetworkTemplate(base.UnitTestCase):
self.req_path = ('/api/v1/clusters/{0}/network_configuration/'
'template'.format(self.env_id))
self.mocker = requests_mock.Mocker()
self.mocker.start()
def tearDown(self):
self.mocker.stop()
def test_upload_action(self):
mput = self.mocker.put(self.req_path, json={})
mput = self.m_request.put(self.req_path, json={})
test_command = [
'fuel', 'network-template', '--env', str(self.env_id), '--upload']
@ -131,7 +124,7 @@ class TestNetworkTemplate(base.UnitTestCase):
m_open().read.assert_called_once_with()
def test_download_action(self):
mget = self.mocker.get(self.req_path, text=JSON_TEMPLATE)
mget = self.m_request.get(self.req_path, text=JSON_TEMPLATE)
test_command = [
'fuel', 'network-template', '--env', str(self.env_id),
@ -149,7 +142,7 @@ class TestNetworkTemplate(base.UnitTestCase):
self.assertEqual(written_yaml, expected_yaml)
def test_delete_action(self):
mdelete = self.mocker.delete(self.req_path, json={})
mdelete = self.m_request.delete(self.req_path, json={})
cmd = ['fuel', 'network-template', '--env', str(self.env_id),
'--delete']

View File

@ -20,7 +20,7 @@ import yaml
from fuelclient.cli import error
from fuelclient.cli.serializers import Serializer
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
class TestSerializers(base.UnitTestCase):
@ -59,8 +59,9 @@ class TestSerializers(base.UnitTestCase):
def test_write_to_path_invalid_file_exception(self):
serializer = Serializer('json')
with self.assertRaises(error.InvalidFileException):
mo = mock.mock_open()
with mock.patch('__main__.open', mo, create=True) as mocked_open:
mocked_open.side_effect = IOError()
serializer.write_to_path('/foo/bar/baz', self.DATA)
mo = mock.mock_open()
with mock.patch('__main__.open', mo, create=True) as mocked_open:
mocked_open.side_effect = IOError()
self.assertRaises(error.InvalidFileException,
serializer.write_to_path,
'/foo/bar/baz', self.DATA)

View File

@ -14,13 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
from mock import Mock
from mock import mock_open
from mock import patch
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
YAML_SETTINGS_DATA = """editable:
@ -41,98 +38,75 @@ JSON_SETTINGS_DATA = {
class BaseSettings(base.UnitTestCase):
def check_upload_action(self, mrequests, test_command, test_url):
def check_upload_action(self, test_command, test_url):
m = mock_open(read_data=YAML_SETTINGS_DATA)
put = self.m_request.put(test_url, json={})
with patch('six.moves.builtins.open', m, create=True):
self.execute(test_command)
request = mrequests.put.call_args_list[0]
url = request[0][0]
data = request[1]['data']
m().read.assert_called_once_with()
self.assertEqual(mrequests.put.call_count, 1)
self.assertIn(test_url, url)
self.assertDictEqual(json.loads(data), JSON_SETTINGS_DATA)
self.assertTrue(put.called)
self.assertDictEqual(put.last_request.json(), JSON_SETTINGS_DATA)
def check_default_action(self, mrequests, test_command, test_url):
def check_default_action(self, test_command, test_url):
m = mock_open()
mresponse = Mock(status_code=200)
mresponse.json.return_value = JSON_SETTINGS_DATA
mrequests.get.return_value = mresponse
get = self.m_request.get(test_url, json=JSON_SETTINGS_DATA)
with patch('six.moves.builtins.open', m, create=True):
self.execute(test_command)
request = mrequests.get.call_args_list[0]
url = request[0][0]
self.assertTrue(get.called)
m().write.assert_called_once_with(YAML_SETTINGS_DATA)
self.assertEqual(mrequests.get.call_count, 1)
self.assertIn(test_url, url)
def check_download_action(self, mrequests, test_command, test_url):
def check_download_action(self, test_command, test_url):
m = mock_open()
mresponse = Mock(status_code=200)
mresponse.json.return_value = JSON_SETTINGS_DATA
mrequests.get.return_value = mresponse
get = self.m_request.get(test_url, json=JSON_SETTINGS_DATA)
with patch('six.moves.builtins.open', m, create=True):
self.execute(test_command)
request = mrequests.get.call_args_list[0]
url = request[0][0]
m().write.assert_called_once_with(YAML_SETTINGS_DATA)
self.assertEqual(mrequests.get.call_count, 1)
self.assertIn(test_url, url)
self.assertTrue(get.called)
@patch('fuelclient.client.requests')
class TestSettings(BaseSettings):
def test_upload_action(self, mrequests):
def test_upload_action(self):
self.check_upload_action(
mrequests=mrequests,
test_command=[
'fuel', 'settings', '--env', '1', '--upload'],
test_url='/api/v1/clusters/1/attributes')
def test_default_action(self, mrequests):
def test_default_action(self):
self.check_default_action(
mrequests=mrequests,
test_command=[
'fuel', 'settings', '--env', '1', '--default'],
test_url='/api/v1/clusters/1/attributes/default')
test_url='/api/v1/clusters/1/attributes/defaults')
def test_download_action(self, mrequests):
def test_download_action(self):
self.check_download_action(
mrequests=mrequests,
test_command=[
'fuel', 'settings', '--env', '1', '--download'],
test_url='/api/v1/clusters/1/attributes')
@patch('fuelclient.client.requests')
class TestVmwareSettings(BaseSettings):
def test_upload_action(self, mrequests):
def test_upload_action(self):
self.check_upload_action(
mrequests=mrequests,
test_command=[
'fuel', 'vmware-settings', '--env', '1', '--upload'],
test_url='/api/v1/clusters/1/vmware_attributes')
def test_default_action(self, mrequests):
def test_default_action(self):
self.check_default_action(
mrequests=mrequests,
test_command=[
'fuel', 'vmware-settings', '--env', '1', '--default'],
test_url='/api/v1/clusters/1/vmware_attributes/default')
test_url='/api/v1/clusters/1/vmware_attributes/defaults')
def test_download_action(self, mrequests):
def test_download_action(self):
self.check_download_action(
mrequests=mrequests,
test_command=[
'fuel', 'vmware-settings', '--env', '1', '--download'],
test_url='/api/v1/clusters/1/vmware_attributes')

View File

@ -24,7 +24,7 @@ import requests
from fuelclient.cli import error
from fuelclient.common import data_utils
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
from fuelclient import utils

View File

@ -0,0 +1,81 @@
# Copyright 2013-2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import requests_mock as rm
import six
try:
from unittest.case import TestCase
except ImportError:
# Runing unit-tests in production environment all
from unittest2.case import TestCase
from fuelclient.cli import parser
class FakeFile(six.StringIO):
"""Context manager for a fake file
NOTE(eli): We cannot use mock_open from mock library
here, because it hangs when we use 'with' statement,
and when we want to read file by chunks.
"""
def __enter__(self):
return self
def __exit__(self, *args):
pass
class UnitTestCase(TestCase):
"""Base test class which does not require nailgun server to run."""
def setUp(self):
super(UnitTestCase, self).setUp()
self.auth_required_patcher = mock.patch('fuelclient.client.'
'Client.auth_required',
new_callable=mock.PropertyMock)
self.auth_required_mock = self.auth_required_patcher.start()
self.auth_required_mock.return_value = False
self.m_request = rm.Mocker()
self.m_request.start()
self.top_matcher = self.m_request.register_uri(rm.ANY,
rm.ANY,
json={})
self.addCleanup(self.auth_required_patcher.stop)
self.addCleanup(self.m_request.stop)
def execute(self, command):
"""Execute old CLI."""
return parser.main(command)
def mock_open(self, text, filename='some.file'):
"""Mocks builtin open function
Usage example:
with mock.patch('__builtin__.open', self.mock_open('file content')):
# call mocked code
"""
fileobj = FakeFile(text)
setattr(fileobj, 'name', filename)
return mock.MagicMock(return_value=fileobj)

View File

@ -14,14 +14,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
from mock import patch
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
@patch('fuelclient.client.requests')
@patch('fuelclient.cli.serializers.open', create=True)
@patch('fuelclient.cli.actions.base.os')
class TestClusterAttributesActions(base.UnitTestCase):
@ -33,25 +30,23 @@ class TestClusterAttributesActions(base.UnitTestCase):
_output = 'editable:\n test: foo\n'
def test_attributes_download(self, mos, mopen, mrequests):
mrequests.get().json.return_value = self._input
def test_attributes_download(self, mos, mopen):
get = self.m_request.get('/api/v1/clusters/1/attributes',
json=self._input)
self.execute(
['fuel', 'env', '--env', '1', '--attributes', '--download'])
url = mrequests.get.call_args[0][0]
self.assertIn('clusters/1/attributes', url)
self.assertTrue(get.called)
mopen().__enter__().write.assert_called_once_with(self._output)
def test_attributes_upload(self, mos, mopen, mrequests):
def test_attributes_upload(self, mos, mopen):
mopen().__enter__().read.return_value = self._output
put = self.m_request.put('/api/v1/clusters/1/attributes', json={})
self.execute(
['fuel', 'env', '--env', '1', '--attributes', '--upload'])
self.assertEqual(mrequests.put.call_count, 1)
call_args = mrequests.put.call_args_list[0]
url = call_args[0][0]
kwargs = call_args[1]
self.assertIn('clusters/1/attributes', url)
self.assertEqual(json.loads(kwargs['data']), self._input)
self.assertTrue(put.called)
self.assertEqual(put.last_request.json(), self._input)

View File

@ -19,7 +19,7 @@ from mock import patch
from six.moves import urllib
from fuelclient import fuelclient_settings
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
class TestAuthentication(base.UnitTestCase):
@ -43,11 +43,9 @@ class TestAuthentication(base.UnitTestCase):
self.assertEqual(int(conf.LISTEN_PORT), int(pr.port))
self.assertEqual('/keystone/v2.0', pr.path)
@patch('fuelclient.client.requests')
@patch('fuelclient.client.auth_client')
def test_credentials(self, mkeystone_cli, mrequests):
def test_credentials(self, mkeystone_cli):
mkeystone_cli.return_value = Mock(auth_token='')
mrequests.get_request.return_value = Mock(status_code=200)
self.execute(
['fuel', '--user=a', '--password=b', 'node'])
self.validate_credentials_response(

View File

@ -17,12 +17,11 @@
import json
import mock
import requests_mock
import yaml
from fuelclient.cli.actions import base
from fuelclient.cli import error
from fuelclient.tests import base as base_tests
from fuelclient.tests.unit.v1 import base as base_tests
from fuelclient.tests.utils import fake_fuel_version
@ -69,17 +68,17 @@ class TestBaseAction(base_tests.UnitTestCase):
self.assertEqual(m_os.mkdir.call_count, 0)
@requests_mock.mock()
class TestFuelVersion(base_tests.UnitTestCase):
VERSION = fake_fuel_version.get_fake_fuel_version()
def test_return_yaml(self, mrequests):
mrequests.get('/api/v1/version', json=self.VERSION)
def test_return_yaml(self):
self.m_request.get('/api/v1/version', json=self.VERSION)
with mock.patch('sys.stdout') as mstdout:
with self.assertRaises(SystemExit):
self.execute(['fuel', '--fuel-version', '--yaml'])
self.assertRaises(SystemExit,
self.execute,
['fuel', '--fuel-version', '--yaml'])
args, _ = mstdout.write.call_args_list[0]
regex = ('No JSON object could be decoded'
'|Expecting value: line 1 column 1')
@ -87,11 +86,12 @@ class TestFuelVersion(base_tests.UnitTestCase):
json.loads(args[0])
self.assertEqual(self.VERSION, yaml.load(args[0]))
def test_return_json(self, mrequests):
mrequests.get('/api/v1/version', json=self.VERSION)
def test_return_json(self):
self.m_request.get('/api/v1/version', json=self.VERSION)
with mock.patch('sys.stdout') as mstdout:
with self.assertRaises(SystemExit):
self.execute(['fuel', '--fuel-version', '--json'])
self.assertRaises(SystemExit,
self.execute,
['fuel', '--fuel-version', '--json'])
args, _ = mstdout.write.call_args_list[0]
self.assertEqual(self.VERSION, json.loads(args[0]))

View File

@ -14,12 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
from mock import patch
import requests_mock as rm
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
API_INPUT = [{'id': 'primary-controller'}]
@ -29,61 +29,59 @@ MULTIPLE_RELEASES = [{'id': 1, 'version': '2014.2-6.0', 'name': 'Something'},
{'id': 2, 'version': '2014.3-6.1', 'name': 'Something'}]
@patch('fuelclient.client.requests')
@patch('fuelclient.cli.serializers.open', create=True)
@patch('fuelclient.cli.actions.base.os')
class TestReleaseDeploymentTasksActions(base.UnitTestCase):
def test_release_tasks_download(self, mos, mopen, mrequests):
mrequests.get().json.return_value = API_INPUT
def test_release_tasks_download(self, mos, mopen):
self.m_request.get(rm.ANY, json=API_INPUT)
self.execute(
['fuel', 'rel', '--rel', '1', '--deployment-tasks', '--download'])
mopen().__enter__().write.assert_called_once_with(API_OUTPUT)
def test_release_tasks_upload(self, mos, mopen, mrequests):
def test_release_tasks_upload(self, mos, mopen):
mopen().__enter__().read.return_value = API_OUTPUT
put = self.m_request.put('/api/v1/releases/1/deployment_tasks',
json=API_OUTPUT)
self.execute(
['fuel', 'rel', '--rel', '1', '--deployment-tasks', '--upload'])
self.assertEqual(mrequests.put.call_count, 1)
call_args = mrequests.put.call_args_list[0]
url = call_args[0][0]
kwargs = call_args[1]
self.assertIn('releases/1/deployment_tasks', url)
self.assertEqual(
json.loads(kwargs['data']), API_INPUT)
self.assertTrue(put.called)
self.assertEqual(put.last_request.json(), API_INPUT)
@patch('fuelclient.client.requests')
@patch('fuelclient.cli.serializers.open', create=True)
@patch('fuelclient.cli.actions.base.os')
class TestClusterDeploymentTasksActions(base.UnitTestCase):
def test_cluster_tasks_download(self, mos, mopen, mrequests):
mrequests.get().json.return_value = API_INPUT
def test_cluster_tasks_download(self, mos, mopen):
self.m_request.get(rm.ANY, json=API_INPUT)
self.execute(
['fuel', 'env', '--env', '1', '--deployment-tasks', '--download'])
mopen().__enter__().write.assert_called_once_with(API_OUTPUT)
def test_cluster_tasks_upload(self, mos, mopen, mrequests):
def test_cluster_tasks_upload(self, mos, mopen):
mopen().__enter__().read.return_value = API_OUTPUT
put = self.m_request.put('/api/v1/clusters/1/deployment_tasks',
json=API_OUTPUT)
self.execute(
['fuel', 'env', '--env', '1', '--deployment-tasks', '--upload'])
self.assertEqual(mrequests.put.call_count, 1)
call_args = mrequests.put.call_args_list[0]
url = call_args[0][0]
kwargs = call_args[1]
self.assertIn('clusters/1/deployment_tasks', url)
self.assertEqual(
json.loads(kwargs['data']), API_INPUT)
self.assertTrue(put.called)
self.assertEqual(put.last_request.json(), API_INPUT)
@patch('fuelclient.client.requests')
@patch('fuelclient.cli.serializers.open', create=True)
@patch('fuelclient.utils.iterfiles')
class TestSyncDeploymentTasks(base.UnitTestCase):
def test_sync_deployment_scripts(self, mfiles, mopen, mrequests):
mrequests.get().json.return_value = RELEASE_OUTPUT
def test_sync_deployment_scripts(self, mfiles, mopen):
self.m_request.get(rm.ANY, json=RELEASE_OUTPUT)
put = self.m_request.put('/api/v1/releases/1/deployment_tasks',
json={})
mfiles.return_value = ['/etc/puppet/2014.2-6.0/tasks.yaml']
mopen().__enter__().read.return_value = API_OUTPUT
file_pattern = '*tests*'
@ -93,16 +91,12 @@ class TestSyncDeploymentTasks(base.UnitTestCase):
mfiles.assert_called_once_with(
os.path.realpath(os.curdir), file_pattern)
call_args = mrequests.put.call_args_list[0]
url = call_args[0][0]
kwargs = call_args[1]
self.assertIn('releases/1/deployment_tasks', url)
self.assertEqual(
json.loads(kwargs['data']), API_INPUT)
self.assertTrue(put.called)
self.assertEqual(put.last_request.json(), API_INPUT)
@patch('fuelclient.cli.actions.release.os')
def test_sync_with_directory_path(self, mos, mfiles, mopen, mrequests):
mrequests.get().json.return_value = RELEASE_OUTPUT
def test_sync_with_directory_path(self, mos, mfiles, mopen):
self.m_request.get(rm.ANY, json=RELEASE_OUTPUT)
mos.path.realpath.return_value = real_path = '/etc/puppet'
mfiles.return_value = [real_path + '/2014.2-6.0/tasks.yaml']
mopen().__enter__().read.return_value = API_OUTPUT
@ -110,8 +104,10 @@ class TestSyncDeploymentTasks(base.UnitTestCase):
['fuel', 'rel', '--sync-deployment-tasks', '--dir', real_path])
mfiles.assert_called_once_with(real_path, '*tasks.yaml')
def test_multiple_tasks_but_one_release(self, mfiles, mopen, mrequests):
mrequests.get().json.return_value = RELEASE_OUTPUT
def test_multiple_tasks_but_one_release(self, mfiles, mopen):
self.m_request.get(rm.ANY, json=RELEASE_OUTPUT)
put = self.m_request.put(rm.ANY, json={})
mfiles.return_value = ['/etc/puppet/2014.2-6.0/tasks.yaml',
'/etc/puppet/2014.3-6.1/tasks.yaml']
mopen().__enter__().read.return_value = API_OUTPUT
@ -119,10 +115,11 @@ class TestSyncDeploymentTasks(base.UnitTestCase):
self.execute(
['fuel', 'rel', '--sync-deployment-tasks'])
self.assertEqual(mrequests.put.call_count, 1)
self.assertEqual(put.call_count, 1)
def test_multiple_releases(self, mfiles, mopen, mrequests):
mrequests.get().json.return_value = MULTIPLE_RELEASES
def test_multiple_releases(self, mfiles, mopen):
self.m_request.get(rm.ANY, json=MULTIPLE_RELEASES)
put = self.m_request.put(rm.ANY, json={})
mfiles.return_value = ['/etc/puppet/2014.2-6.0/tasks.yaml',
'/etc/puppet/2014.3-6.1/tasks.yaml']
mopen().__enter__().read.return_value = API_OUTPUT
@ -130,4 +127,4 @@ class TestSyncDeploymentTasks(base.UnitTestCase):
self.execute(
['fuel', 'rel', '--sync-deployment-tasks'])
self.assertEqual(mrequests.put.call_count, 2)
self.assertEqual(put.call_count, 2)

View File

@ -16,23 +16,23 @@
import mock
import requests_mock
import requests_mock as rm
from six import moves
from fuelclient.objects.environment import Environment
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
@requests_mock.mock()
class TestEnvironment(base.UnitTestCase):
def test_delete_operational_wo_force(self, m_requests):
def test_delete_operational_wo_force(self):
cluster_id = 1
url = '/api/v1/clusters/{0}/'.format(cluster_id)
cmd = 'fuel --env {0} env delete'.format(cluster_id)
m_requests.get(url, json={'id': cluster_id, 'status': 'operational'})
m_delete = m_requests.delete(url)
self.m_request.get(url,
json={'id': cluster_id, 'status': 'operational'})
m_delete = self.m_request.delete(url)
with mock.patch('sys.stdout', new=moves.cStringIO()) as m_stdout:
self.execute(cmd.split())
@ -40,7 +40,7 @@ class TestEnvironment(base.UnitTestCase):
self.assertFalse(m_delete.called)
def test_nova_network_using_warning(self, m_requests):
def test_nova_network_using_warning(self):
cluster_id = 1
cluster_data = {
'id': cluster_id,
@ -48,9 +48,9 @@ class TestEnvironment(base.UnitTestCase):
'mode': 'ha_compact',
'net_provider': 'neutron'
}
m_requests.post('/api/v1/clusters/', json=cluster_data)
m_requests.get('/api/v1/clusters/{0}/'.format(cluster_id),
json=cluster_data)
self.m_request.post('/api/v1/clusters/', json=cluster_data)
self.m_request.get('/api/v1/clusters/{0}/'.format(cluster_id),
json=cluster_data)
with mock.patch('sys.stdout', new=moves.cStringIO()) as m_stdout:
self.execute(
@ -61,7 +61,7 @@ class TestEnvironment(base.UnitTestCase):
'deprecated since 6.1 release.',
m_stdout.getvalue())
def test_neutron_gre_using_warning(self, m_requests):
def test_neutron_gre_using_warning(self):
cluster_id = 1
cluster_data = {
'id': cluster_id,
@ -69,9 +69,9 @@ class TestEnvironment(base.UnitTestCase):
'mode': 'ha_compact',
'net_provider': 'neutron'
}
m_requests.post('/api/v1/clusters/', json=cluster_data)
m_requests.get('/api/v1/clusters/{0}/'.format(cluster_id),
json=cluster_data)
self.m_request.post('/api/v1/clusters/', json=cluster_data)
self.m_request.get('/api/v1/clusters/{0}/'.format(cluster_id),
json=cluster_data)
with mock.patch('sys.stdout', new=moves.cStringIO()) as m_stdout:
self.execute(
@ -83,7 +83,7 @@ class TestEnvironment(base.UnitTestCase):
"deprecated since 7.0 release.",
m_stdout.getvalue())
def test_create_env_with_mode_set(self, m_requests):
def test_create_env_with_mode_set(self):
cluster_id = 1
cluster_data = {
'id': cluster_id,
@ -91,15 +91,15 @@ class TestEnvironment(base.UnitTestCase):
'mode': 'ha_compact',
'net_provider': 'neutron'
}
m_post = m_requests.post('/api/v1/clusters/', json=cluster_data)
m_requests.get('/api/v1/clusters/{0}/'.format(cluster_id),
json=cluster_data)
m_post = self.m_request.post('/api/v1/clusters/', json=cluster_data)
self.m_request.get('/api/v1/clusters/{0}/'.format(cluster_id),
json=cluster_data)
self.execute('fuel env create'
' --name test --rel 1 --mode ha'.split())
self.assertEqual('ha_compact', m_post.last_request.json()['mode'])
def test_multimode_warning(self, m_requests):
def test_multimode_warning(self):
cluster_id = 1
cluster_data = {
'id': cluster_id,
@ -107,9 +107,9 @@ class TestEnvironment(base.UnitTestCase):
'mode': 'multinode',
'net_provider': 'neutron'
}
m_post = m_requests.post('/api/v1/clusters/', json=cluster_data)
m_requests.get('/api/v1/clusters/{0}/'.format(cluster_id),
json=cluster_data)
m_post = self.m_request.post('/api/v1/clusters/', json=cluster_data)
self.m_request.get('/api/v1/clusters/{0}/'.format(cluster_id),
json=cluster_data)
with mock.patch('sys.stdout', new=moves.cStringIO()) as m_stdout:
self.execute('fuel env create'
@ -157,11 +157,10 @@ class TestEnvironmentOstf(base.UnitTestCase):
{'id': 1, 'status': 'running'},
{'id': 2, 'status': 'finished'}])
@mock.patch('fuelclient.client.requests')
def test_get_deployment_tasks_with_end(self, mrequests):
def test_get_deployment_tasks_with_end(self):
end = 'task1'
get = self.m_request.get(rm.ANY, json={})
self.env.get_deployment_tasks(end=end)
kwargs = mrequests.get.call_args[1]
self.assertEqual(
kwargs['params'],
{'start': None, 'end': 'task1', 'include': None})
self.assertEqual(get.last_request.qs, {'end': ['task1']})

View File

@ -14,19 +14,17 @@
import json
import mock
import requests_mock
import yaml
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
from fuelclient.tests.utils import fake_fuel_version
@requests_mock.mock()
class TestFuelVersion(base.UnitTestCase):
def test_return_yaml(self, mrequests):
mrequests.get('/api/v1/version/',
json=fake_fuel_version.get_fake_fuel_version())
def test_return_yaml(self):
self.m_request.get('/api/v1/version/',
json=fake_fuel_version.get_fake_fuel_version())
with mock.patch('sys.stdout') as mstdout:
self.execute(['fuel', 'fuel-version', '--yaml'])
@ -39,9 +37,9 @@ class TestFuelVersion(base.UnitTestCase):
fake_fuel_version.get_fake_fuel_version(),
yaml.safe_load(args[0]))
def test_return_json(self, mrequests):
mrequests.get('/api/v1/version/',
json=fake_fuel_version.get_fake_fuel_version())
def test_return_json(self):
self.m_request.get('/api/v1/version/',
json=fake_fuel_version.get_fake_fuel_version())
with mock.patch('sys.stdout') as mstdout:
self.execute(['fuel', 'fuel-version', '--json'])

View File

@ -19,10 +19,9 @@ import io
import os
import mock
import requests_mock
from fuelclient.cli.actions import graph
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
GRAPH_API_OUTPUT = "digraph G { A -> B -> C }"
@ -36,12 +35,10 @@ class TestGraphAction(base.UnitTestCase):
def setUp(self):
super(TestGraphAction, self).setUp()
self.requests_mock = requests_mock.mock()
self.requests_mock.start()
self.m_tasks_api = self.requests_mock.get(
self.m_tasks_api = self.m_request.get(
'/api/v1/clusters/1/deployment_tasks',
json=TASKS_API_OUTPUT)
self.m_graph_api = self.requests_mock.get(
self.m_graph_api = self.m_request.get(
'/api/v1/clusters/1/deploy_tasks/graph.gv',
text=GRAPH_API_OUTPUT)
@ -51,7 +48,6 @@ class TestGraphAction(base.UnitTestCase):
def tearDown(self):
super(TestGraphAction, self).tearDown()
self.requests_mock.stop()
self.m_full_path.stop()
def test_download_all_tasks(self):
@ -160,10 +156,9 @@ class TestGraphAction(base.UnitTestCase):
@mock.patch('fuelclient.cli.actions.graph.os.path.exists')
def test_render_no_file(self, m_exists):
m_exists.return_value = False
with self.assertRaises(SystemExit):
self.execute(
['fuel', 'graph', '--render', 'graph.gv']
)
self.assertRaises(SystemExit,
self.execute,
['fuel', 'graph', '--render', 'graph.gv'])
@mock.patch('fuelclient.cli.actions.graph.open', create=True)
@mock.patch('fuelclient.cli.actions.graph.render_graph')
@ -206,8 +201,8 @@ class TestGraphAction(base.UnitTestCase):
output_dir = '/output/dir'
self.m_full_path.return_value = output_dir
with self.assertRaises(SystemExit):
self.execute(
['fuel', 'graph', '--render', 'graph.gv', '--dir', output_dir]
)
self.assertRaises(SystemExit,
self.execute,
['fuel', 'graph', '--render',
'graph.gv', '--dir', output_dir])
m_access.assert_called_once_with(output_dir, os.W_OK)

View File

@ -13,13 +13,11 @@
# under the License.
import mock
import requests_mock
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
from fuelclient.tests.utils import fake_network_group
@requests_mock.mock()
class TestNetworkGroupActions(base.UnitTestCase):
def setUp(self):
@ -30,8 +28,8 @@ class TestNetworkGroupActions(base.UnitTestCase):
self.ng = fake_network_group.get_fake_network_group()
def test_list_network_groups(self, mreq):
mget = mreq.get(self.req_base_path, json={})
def test_list_network_groups(self):
mget = self.m_request.get(self.req_base_path, json={})
list_commands = [
['fuel', 'network-group', '--list'], ['fuel', 'network-group']]
@ -39,8 +37,8 @@ class TestNetworkGroupActions(base.UnitTestCase):
self.execute(cmd)
self.assertTrue(mget.called)
def test_list_network_groups_filtering(self, mreq):
mget = mreq.get(self.req_base_path, json={})
def test_list_network_groups_filtering(self):
mget = self.m_request.get(self.req_base_path, json={})
self.execute(
['fuel', 'network-group', '--node-group', str(self.ng['id'])]
@ -48,8 +46,8 @@ class TestNetworkGroupActions(base.UnitTestCase):
self.assertTrue(mget.called)
def create_network_group(self, mreq, cmd):
mpost = mreq.post(self.req_base_path, json={
def create_network_group(self, cmd):
mpost = self.m_request.post(self.req_base_path, json={
'id': self.ng['id'],
'name': self.ng['name'],
})
@ -63,76 +61,76 @@ class TestNetworkGroupActions(base.UnitTestCase):
return call_data
def test_create_network_group(self, mreq):
def test_create_network_group(self):
cmd = ['fuel', 'network-group', '--create', '--cidr', self.ng['cidr'],
'--name', self.ng['name'], '--node-group', str(self.ng['id'])]
self.create_network_group(mreq, cmd)
self.create_network_group(cmd)
def test_create_network_group_w_meta(self, mreq):
def test_create_network_group_w_meta(self):
cmd = ['fuel', 'network-group', '--create', '--cidr', self.ng['cidr'],
'--name', self.ng['name'], '--node-group', str(self.ng['id']),
'--meta', '{"ip_ranges": ["10.0.0.2", "10.0.0.254"]}']
self.create_network_group(mreq, cmd)
self.create_network_group(cmd)
meta = mreq.last_request.json()['meta']
meta = self.m_request.last_request.json()['meta']
self.assertEqual(meta['ip_ranges'], ["10.0.0.2", "10.0.0.254"])
def test_create_network_group_required_args(self, mreq):
def test_create_network_group_required_args(self):
with mock.patch("sys.stderr") as m_stderr:
with self.assertRaises(SystemExit):
self.execute(
['fuel', 'network-group', '--create'])
self.assertRaises(SystemExit,
self.execute,
['fuel', 'network-group', '--create'])
self.assertIn('--nodegroup", "--name" and "--cidr" required!',
m_stderr.write.call_args[0][0])
def test_delete_network_group_required_args(self, mreq):
def test_delete_network_group_required_args(self):
with mock.patch("sys.stderr") as m_stderr:
with self.assertRaises(SystemExit):
self.execute(
['fuel', 'network-group', '--delete'])
self.assertRaises(SystemExit,
self.execute,
['fuel', 'network-group', '--delete'])
self.assertIn('"--network" required!', m_stderr.write.call_args[0][0])
def test_delete_network_group(self, mreq):
def test_delete_network_group(self):
path = self.req_base_path + str(self.env_id) + '/'
mdelete = mreq.delete(path, status_code=204)
mdelete = self.m_request.delete(path, status_code=204)
self.execute(
['fuel', 'network-group', '--delete',
'--network', str(self.env_id)])
self.assertTrue(mdelete.called)
def test_network_group_duplicate_name(self, mreq):
mpost = mreq.post(self.req_base_path, status_code=409)
def test_network_group_duplicate_name(self):
mpost = self.m_request.post(self.req_base_path, status_code=409)
with mock.patch("sys.stderr") as m_stderr:
with self.assertRaises(SystemExit):
self.execute(
['fuel', 'network-group', '--create', '--cidr',
self.ng['cidr'], '--name', self.ng['name'],
'--node-group', str(self.ng['id'])])
self.assertRaises(SystemExit,
self.execute,
['fuel', 'network-group', '--create', '--cidr',
self.ng['cidr'], '--name', self.ng['name'],
'--node-group', str(self.ng['id'])])
self.assertIn("409 Client Error", m_stderr.write.call_args[0][0])
self.assertTrue(mpost.called)
def test_set_network_group(self, mreq):
def test_set_network_group(self):
path = self.req_base_path + str(self.env_id) + '/'
mput = mreq.put(path, json={})
mput = self.m_request.put(path, json={})
self.execute([
'fuel', 'network-group', '--set', '--network', str(self.ng['id']),
'--name', self.ng['name']])
self.assertTrue(mput.called)
def test_set_network_group_meta(self, mreq):
def test_set_network_group_meta(self):
path = self.req_base_path + str(self.env_id) + '/'
mput = mreq.put(path, json={})
mput = self.m_request.put(path, json={})
self.execute([
'fuel', 'network-group', '--set', '--network', str(self.ng['id']),
'--meta', '{"ip_ranges": ["10.0.0.2", "10.0.0.254"]}'])
self.assertTrue(mput.called)
meta = mreq.last_request.json()['meta']
meta = self.m_request.last_request.json()['meta']
self.assertEqual(meta['ip_ranges'], ["10.0.0.2", "10.0.0.254"])

View File

@ -17,9 +17,8 @@
import yaml
from mock import patch
import requests_mock
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
ENV_OUTPUT = {
@ -44,22 +43,21 @@ NETWORK_CONFIG_ERROR_OUTPUT = {
}
@requests_mock.mock(kw='mrequests')
@patch('fuelclient.cli.serializers.open', create=True)
@patch('fuelclient.cli.actions.base.os')
class TestNetworkActions(base.UnitTestCase):
def test_network_download(self, mos, mopen, mrequests=None):
mrequests.get('/api/v1/clusters/1/', json=ENV_OUTPUT)
mrequests.get('/api/v1/clusters/1/network_configuration/neutron',
json=yaml.load(FILE_INPUT))
def test_network_download(self, mos, mopen):
self.m_request.get('/api/v1/clusters/1/', json=ENV_OUTPUT)
self.m_request.get('/api/v1/clusters/1/network_configuration/neutron',
json=yaml.load(FILE_INPUT))
self.execute(['fuel', 'network', '--env', '1', '--download'])
mopen().__enter__().write.assert_called_once_with(FILE_INPUT)
def test_network_upload(self, mos, mopen, mrequests=None):
def test_network_upload(self, mos, mopen):
mopen().__enter__().read.return_value = FILE_INPUT
mrequests.get('/api/v1/clusters/1/', json=ENV_OUTPUT)
mneutron_put = mrequests.put(
self.m_request.get('/api/v1/clusters/1/', json=ENV_OUTPUT)
mneutron_put = self.m_request.put(
'/api/v1/clusters/1/network_configuration/neutron',
json=NETWORK_CONFIG_OK_OUTPUT)
self.execute(['fuel', 'network', '--env', '1', '--upload', 'smth'])
@ -67,10 +65,10 @@ class TestNetworkActions(base.UnitTestCase):
url = mneutron_put.request_history[0].url
self.assertIn('clusters/1/network_configuration/neutron', url)
def test_network_upload_with_error(self, mos, mopen, mrequests=None):
def test_network_upload_with_error(self, mos, mopen):
mopen().__enter__().read.return_value = FILE_INPUT
mrequests.get('/api/v1/clusters/1/', json=ENV_OUTPUT)
mrequests.put(
self.m_request.get('/api/v1/clusters/1/', json=ENV_OUTPUT)
self.m_request.put(
'/api/v1/clusters/1/network_configuration/neutron',
json=NETWORK_CONFIG_ERROR_OUTPUT)
with patch('sys.stdout') as fake_out:

View File

@ -13,15 +13,13 @@
# under the License.
import mock
import requests_mock
from six import StringIO
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
from fuelclient.tests.utils import fake_env
from fuelclient.tests.utils import fake_network_group
@requests_mock.mock()
class TestNodeGroupActions(base.UnitTestCase):
def setUp(self):
@ -31,27 +29,26 @@ class TestNodeGroupActions(base.UnitTestCase):
self.req_base_path = '/api/v1/nodegroups/'
self.ng = fake_network_group.get_fake_network_group()
def test_list_nodegroups(self, mreq):
mget = mreq.get(self.req_base_path, json=[])
def test_list_nodegroups(self):
mget = self.m_request.get(self.req_base_path, json=[])
self.execute(['fuel', 'nodegroup', '--list'])
self.assertTrue(mget.called)
def test_create_nodegroup(self, mreq):
def test_create_nodegroup(self):
neutron_url = \
'/api/v1/clusters/{0}/network_configuration/neutron'.format(
self.env['id']
)
mreq.get('/api/v1/clusters/{0}/'.format(self.env['id']), json={
'id': self.env['id'],
'net_provider': self.env['net_provider']
})
mpost = mreq.post(self.req_base_path, json={
'id': self.ng['id'],
'name': self.ng['name'],
})
mget = mreq.get(neutron_url, json={'networking_parameters': {}})
self.m_request.get('/api/v1/clusters/{0}/'.format(self.env['id']),
json={'id': self.env['id'],
'net_provider': self.env['net_provider']})
mpost = self.m_request.post(self.req_base_path,
json={'id': self.ng['id'],
'name': self.ng['name']})
mget = self.m_request.get(neutron_url,
json={'networking_parameters': {}})
with mock.patch('sys.stdout', new=StringIO()) as m_stdout:
self.execute([
'fuel', 'nodegroup', '--create',
@ -74,12 +71,11 @@ class TestNodeGroupActions(base.UnitTestCase):
def _check_required_message_for_commands(self, err_msg, commands):
for cmd in commands:
with mock.patch("sys.stderr") as m_stderr:
with self.assertRaises(SystemExit):
self.execute(cmd)
self.assertRaises(SystemExit, self.execute, cmd)
m_stderr.write.assert_called_with(err_msg)
def test_create_nodegroup_arguments_required(self, mreq):
def test_create_nodegroup_arguments_required(self):
err_msg = '"--env" and "--name" required!\n'
env_not_present = ['fuel', 'nodegroup', '--create',
@ -91,11 +87,11 @@ class TestNodeGroupActions(base.UnitTestCase):
self._check_required_message_for_commands(
err_msg, (env_not_present, name_not_present))
def test_delete_nodegroup(self, mreq):
def test_delete_nodegroup(self):
path = self.req_base_path + str(self.env['id']) + '/'
mget = mreq.get(path, json={'name': 'test group'})
mget = self.m_request.get(path, json={'name': 'test group'})
delete_path = self.req_base_path + str(self.env['id']) + '/'
mdelete = mreq.delete(delete_path, status_code=204)
mdelete = self.m_request.delete(delete_path, status_code=204)
ngid = self.env['id']
with mock.patch('sys.stdout', new=StringIO()) as m_stdout:
self.execute(['fuel', 'nodegroup', '--delete', '--group',
@ -109,25 +105,26 @@ class TestNodeGroupActions(base.UnitTestCase):
self.assertTrue(mget.called)
self.assertTrue(mdelete.called)
def test_delete_nodegroup_group_arg_required(self, mreq):
def test_delete_nodegroup_group_arg_required(self):
err_msg = '"--group" required!\n'
self._check_required_message_for_commands(
err_msg,
(['fuel', 'nodegroup', '--delete', '--default'],)
)
def test_assign_nodegroup_fails_w_multiple_groups(self, mreq):
def test_assign_nodegroup_fails_w_multiple_groups(self):
err_msg = "Nodes can only be assigned to one node group.\n"
with mock.patch("sys.stderr") as m_stderr:
with self.assertRaises(SystemExit):
self.execute(['fuel', 'nodegroup', '--assign', '--node', '1',
'--group', '2,3'])
self.assertRaises(SystemExit,
self.execute,
['fuel', 'nodegroup', '--assign', '--node',
'1', '--group', '2,3'])
msg = m_stderr.write.call_args[0][0]
self.assertEqual(msg, err_msg)
@mock.patch('fuelclient.objects.nodegroup.NodeGroup.assign')
def test_assign_nodegroup(self, m_req, m_assign):
def test_assign_nodegroup(self, m_assign):
self.execute(['fuel', 'nodegroup', '--assign', '--node', '1',
'--group', '2'])
m_assign.assert_called_with([1])
@ -136,7 +133,7 @@ class TestNodeGroupActions(base.UnitTestCase):
'--group', '2'])
m_assign.assert_called_with([1, 2, 3])
def test_node_group_assign_arguments_required(self, mreq):
def test_node_group_assign_arguments_required(self):
err_msg = '"--node" and "--group" required!\n'
node_not_present_cmd = ['fuel', 'nodegroup', '--assign',

View File

@ -14,77 +14,85 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
from mock import patch
import requests_mock as rm
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
from fuelclient.tests.utils import fake_node
@patch('fuelclient.client.requests')
class TestNodeExecuteTasksAction(base.UnitTestCase):
def setUp(self):
super(TestNodeExecuteTasksAction, self).setUp()
self.tasks = ['netconfig', 'hiera', 'install']
def test_execute_provided_list_of_tasks(self, mrequests):
node_patch = patch('fuelclient.objects.node.Node.get_fresh_data')
m_fresh_data = node_patch.start()
m_fresh_data.return_value = fake_node.get_fake_node()
self.addCleanup(node_patch.stop)
def test_execute_provided_list_of_tasks(self):
put = self.m_request.put(rm.ANY, json={'id': 43})
self.execute(['fuel', 'node', '--node', '1,2', '--tasks'] + self.tasks)
kwargs = mrequests.put.call_args_list[0][1]
self.assertEqual(json.loads(kwargs['data']), self.tasks)
self.assertEqual(put.last_request.json(), self.tasks)
@patch('fuelclient.objects.environment.Environment.get_deployment_tasks')
def test_skipped_tasks(self, get_tasks, mrequests):
def test_skipped_tasks(self, get_tasks):
get_tasks.return_value = [{'id': t} for t in self.tasks]
put = self.m_request.put(rm.ANY, json={'id': 43})
self.execute(
['fuel', 'node', '--node', '1,2', '--skip'] + self.tasks[:2])
kwargs = mrequests.put.call_args_list[0][1]
self.assertEqual(json.loads(kwargs['data']), self.tasks[2:])
self.assertEqual(put.last_request.json(), self.tasks[2:])
@patch('fuelclient.objects.environment.Environment.get_deployment_tasks')
def test_included_tasks(self, get_tasks, mrequests):
def test_included_tasks(self, get_tasks):
get_tasks.return_value = [{'id': t} for t in self.tasks]
put = self.m_request.put(rm.ANY, json={'id': 43})
self.execute(
['fuel', 'node', '--node', '1', '--start', 'netconfig',
'--tasks', 'hiera'])
kwargs = mrequests.put.call_args_list[0][1]
self.assertEqual(json.loads(kwargs['data']), self.tasks)
self.assertEqual(put.last_request.json(), self.tasks)
get_tasks.assert_called_once_with(
start='netconfig', end=None, include=['hiera'])
@patch('fuelclient.objects.environment.Environment.get_deployment_tasks')
def test_dont_fail_on_empty_tasks(self, get_tasks, mrequests):
def test_dont_fail_on_empty_tasks(self, get_tasks):
get_tasks.return_value = []
self.execute(
['fuel', 'node', '--node', '1', '--start', 'netconfig'])
@patch('fuelclient.objects.environment.Environment.get_deployment_tasks')
def test_end_param(self, get_tasks, mrequests):
def test_end_param(self, get_tasks):
put = self.m_request.put(rm.ANY, json={'id': 43})
get_tasks.return_value = [{'id': t} for t in self.tasks[:2]]
self.execute(
['fuel', 'node', '--node', '1,2', '--end', self.tasks[-2]])
kwargs = mrequests.put.call_args_list[0][1]
self.assertEqual(json.loads(kwargs['data']), self.tasks[:2])
self.assertEqual(put.last_request.json(), self.tasks[:2])
get_tasks.assert_called_once_with(
end=self.tasks[-2], start=None, include=None)
@patch('fuelclient.objects.environment.Environment.get_deployment_tasks')
def test_skip_with_end_param(self, get_tasks, mrequests):
def test_skip_with_end_param(self, get_tasks):
get_tasks.return_value = [{'id': t} for t in self.tasks]
put = self.m_request.put(rm.ANY, json={'id': 43})
self.execute(
['fuel', 'node', '--node', '1,2',
'--end', self.tasks[-1], '--skip'] + self.tasks[:2])
kwargs = mrequests.put.call_args_list[0][1]
self.assertEqual(json.loads(kwargs['data']), self.tasks[2:])
self.assertEqual(put.last_request.json(), self.tasks[2:])
get_tasks.assert_called_once_with(
end=self.tasks[-1], start=None, include=None)
@patch('fuelclient.objects.environment.Environment.get_deployment_tasks')
def test_start_with_end_param(self, get_tasks, mrequests):
def test_start_with_end_param(self, get_tasks):
"""end will be included."""
put = self.m_request.put(rm.ANY, json={'id': 43})
start = 1
end = 2
get_tasks.return_value = [{'id': t} for t in self.tasks[start:end + 1]]
@ -92,18 +100,17 @@ class TestNodeExecuteTasksAction(base.UnitTestCase):
['fuel', 'node', '--node', '1,2', '--start', self.tasks[start],
'--end', self.tasks[end]])
kwargs = mrequests.put.call_args_list[0][1]
self.assertEqual(json.loads(kwargs['data']), self.tasks[start:end + 1])
self.assertEqual(put.last_request.json(), self.tasks[start:end + 1])
get_tasks.assert_called_once_with(
end=self.tasks[2], start=self.tasks[1], include=None)
@patch('fuelclient.objects.environment.Environment.get_deployment_tasks')
def test_start_param(self, get_tasks, mrequests):
def test_start_param(self, get_tasks):
put = self.m_request.put(rm.ANY, json={'id': 43})
get_tasks.return_value = [{'id': t} for t in self.tasks[1:]]
self.execute(
['fuel', 'node', '--node', '1,2', '--start', self.tasks[1]])
kwargs = mrequests.put.call_args_list[0][1]
self.assertEqual(json.loads(kwargs['data']), self.tasks[1:])
self.assertEqual(put.last_request.json(), self.tasks[1:])
get_tasks.assert_called_once_with(
start=self.tasks[1], end=None, include=None)

View File

@ -18,7 +18,7 @@ import mock
from six import StringIO
from fuelclient.cli.actions import node
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
GRAPH_API_OUTPUT = "digraph G { A -> B -> C }"
@ -34,8 +34,9 @@ class TestNodeStartAction(base.UnitTestCase):
def test_node_not_assigend(self, _):
for method in ('--deploy', '--provision'):
with mock.patch('sys.stderr', new=StringIO()) as mstderr:
with self.assertRaises(SystemExit):
self.execute(['fuel', 'node', method, '--node', '8'])
self.assertRaises(SystemExit,
self.execute,
['fuel', 'node', method, '--node', '8'])
self.assertIn(
"Input nodes are not assigned to any environment!",
mstderr.getvalue())

View File

@ -14,74 +14,69 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
from mock import Mock
from mock import patch
import requests_mock as rm
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
@patch('fuelclient.client.requests')
class TestNotificationsActions(base.UnitTestCase):
def test_notification_send(self, mrequests):
response_mock = Mock(status_code=201)
mrequests.post.return_value = response_mock
def test_notification_send(self):
post = self.m_request.post(rm.ANY, json={})
self.execute(
['fuel', 'notifications', '--send', 'test message'])
self.assertEqual(mrequests.post.call_count, 1)
request = json.loads(mrequests.post.call_args[1]['data'])
self.assertEqual(post.call_count, 1)
request = post.last_request.json()
self.assertEqual('test message', request['message'])
self.assertEqual('done', request['topic'])
self.execute(
['fuel', 'notify', '-m', 'test message 2'])
self.assertEqual(mrequests.post.call_count, 2)
request = json.loads(mrequests.post.call_args[1]['data'])
self.assertEqual(post.call_count, 2)
request = post.last_request.json()
self.assertEqual('test message 2', request['message'])
self.assertEqual('done', request['topic'])
def test_notification_send_with_topic(self, mrequests):
response_mock = Mock(status_code=201)
mrequests.post.return_value = response_mock
def test_notification_send_with_topic(self):
post = self.m_request.post(rm.ANY, json={})
self.execute(
['fuel', 'notifications', '--send', 'test error',
'--topic', 'error'])
self.assertEqual(mrequests.post.call_count, 1)
request = json.loads(mrequests.post.call_args[1]['data'])
self.assertEqual(post.call_count, 1)
request = post.last_request.json()
self.assertEqual('test error', request['message'])
self.assertEqual('error', request['topic'])
self.execute(
['fuel', 'notify', '-m', 'test error 2', '--topic', 'error'])
self.assertEqual(mrequests.post.call_count, 2)
request = json.loads(mrequests.post.call_args[1]['data'])
self.assertEqual(post.call_count, 2)
request = post.last_request.json()
self.assertEqual('test error 2', request['message'])
self.assertEqual('error', request['topic'])
def test_notification_send_no_message(self, mrequests):
response_mock = Mock(status_code=201)
mrequests.post.return_value = response_mock
def test_notification_send_no_message(self):
post = self.m_request.post(rm.ANY, json={})
self.assertRaises(
SystemExit,
self.execute,
['fuel', 'notifications', '--send']
)
self.assertEqual(mrequests.post.call_count, 0)
self.assertFalse(post.called)
self.assertRaises(
SystemExit,
self.execute,
['fuel', 'notify', '-m']
)
self.assertEqual(mrequests.post.call_count, 0)
self.assertFalse(post.called)
def test_notification_send_invalid_topic(self, mrequests):
response_mock = Mock(status_code=201)
mrequests.post.return_value = response_mock
def test_notification_send_invalid_topic(self):
post = self.m_request.post(rm.ANY, json={})
self.assertRaises(
SystemExit,
@ -89,120 +84,109 @@ class TestNotificationsActions(base.UnitTestCase):
['fuel', 'notifications', '--send', 'test message',
'--topic', 'x']
)
self.assertEqual(mrequests.post.call_count, 0)
self.assertFalse(post.called)
self.assertRaises(
SystemExit,
self.execute,
['fuel', 'notify', '-m', 'test message', '--topic', 'x']
)
self.assertEqual(mrequests.post.call_count, 0)
self.assertFalse(post.called)
def test_mark_as_read(self, mrequests):
m1 = Mock(status=200)
m1.json.return_value = {
'id': 1,
'message': 'test message',
'status': 'unread',
'topic': 'done',
}
m2 = Mock(status=200)
m2.json.return_value = {
'id': 2,
'message': 'test message 2',
'status': 'unread',
'topic': 'done',
}
mrequests.get.side_effect = [m1, m2]
def test_mark_as_read(self):
results = [{'id': 1,
'message': 'test message',
'status': 'unread',
'topic': 'done'},
{'id': 2,
'message': 'test message 2',
'status': 'unread',
'topic': 'done'}]
results.extend(results)
get = self.m_request.get(rm.ANY, [{'json': r} for r in results])
put = self.m_request.put(rm.ANY, json={})
mrequests.put.return_value = Mock(status_code=200)
self.execute(
['fuel', 'notifications', '-r', '1'])
self.assertEqual(mrequests.get.call_count, 1)
self.assertEqual(mrequests.put.call_count, 1)
request = m1.json.return_value
self.assertEqual('test message', request['message'])
self.assertEqual('read', request['status'])
request = m2.json.return_value
self.assertEqual('test message 2', request['message'])
self.assertEqual('unread', request['status'])
self.assertEqual(get.call_count, 1)
self.assertEqual(put.call_count, 1)
mrequests.get.side_effect = [m1, m2]
messages = put.last_request.json()
self.assertEqual(1, len(messages))
msg = messages.pop()
self.assertEqual('test message', msg['message'])
self.assertEqual('read', msg['status'])
self.assertEqual(1, msg['id'])
self.execute(
['fuel', 'notifications', '-r', '1', '2'])
self.assertEqual(mrequests.get.call_count, 3)
self.assertEqual(mrequests.put.call_count, 2)
request = m1.json.return_value
self.assertEqual('test message', request['message'])
self.assertEqual('read', request['status'])
request = m2.json.return_value
self.assertEqual('test message 2', request['message'])
self.assertEqual('read', request['status'])
self.assertEqual(get.call_count, 3)
self.assertEqual(put.call_count, 2)
def test_mark_all_as_read(self, mrequests):
m = Mock(status=200)
m.json.return_value = [
{
'id': 1,
'message': 'test message',
'status': 'unread',
'topic': 'done',
},
{
'id': 2,
'message': 'test message 2',
'status': 'unread',
'topic': 'done',
}
]
mrequests.get.return_value = m
messages = put.last_request.json()
self.assertEqual(2, len(messages))
msg = messages.pop()
self.assertEqual('test message', msg['message'])
self.assertEqual('read', msg['status'])
self.assertEqual(1, msg['id'])
msg = messages.pop()
self.assertEqual('test message 2', msg['message'])
self.assertEqual('read', msg['status'])
self.assertEqual(2, msg['id'])
def test_mark_all_as_read(self):
result = [{'id': 1,
'message': 'test message',
'status': 'unread',
'topic': 'done'},
{'id': 2,
'message': 'test message 2',
'status': 'unread',
'topic': 'done'}]
get = self.m_request.get(rm.ANY, json=result)
put = self.m_request.put(rm.ANY, json={})
mrequests.put.return_value = Mock(status_code=200)
self.execute(
['fuel', 'notifications', '-r', '*'])
self.assertEqual(mrequests.get.call_count, 1)
self.assertEqual(mrequests.put.call_count, 1)
request = m.json.return_value
self.assertEqual(get.call_count, 1)
self.assertEqual(put.call_count, 1)
request = put.last_request.json()
self.assertEqual('test message', request[0]['message'])
self.assertEqual('read', request[0]['status'])
self.assertEqual('test message 2', request[1]['message'])
self.assertEqual('read', request[1]['status'])
@patch('fuelclient.cli.actions.notifications.format_table')
def test_list_notifications(self, mformat_table, mrequests):
m = Mock(status=200)
m.json.return_value = [
{
'id': 1,
'message': 'test message',
'status': 'unread',
'topic': 'done',
},
{
'id': 2,
'message': 'test message 2',
'status': 'read',
'topic': 'done',
}
]
mrequests.get.return_value = m
def test_list_notifications(self, mformat_table):
test_notifications = [{'id': 1,
'message': 'test message',
'status': 'unread',
'topic': 'done'},
{'id': 2,
'message': 'test message 2',
'status': 'read',
'topic': 'done'}]
get = self.m_request.get(rm.ANY, json=test_notifications)
self.m_request.put(rm.ANY, json={})
mrequests.put.return_value = Mock(status_code=200)
self.execute(['fuel', 'notifications'])
self.assertEqual(mrequests.get.call_count, 1)
self.assertEqual(get.call_count, 1)
notifications = mformat_table.call_args[0][0]
self.assertEqual(len(notifications), 1)
self.assertDictEqual(notifications[0], m.json.return_value[0])
self.assertDictEqual(notifications[0], test_notifications[0])
@patch('fuelclient.cli.actions.notifications.format_table')
def test_list_all_notifications(self, mformat_table, mrequests):
m = Mock(status=200)
m.json.return_value = [
def test_list_all_notifications(self, mformat_table):
test_notifications = [
{
'id': 1,
'message': 'test message',
@ -216,12 +200,12 @@ class TestNotificationsActions(base.UnitTestCase):
'topic': 'done',
}
]
mrequests.get.return_value = m
get = self.m_request.get(rm.ANY, json=test_notifications)
self.m_request.put(rm.ANY, json={})
mrequests.put.return_value = Mock(status_code=200)
self.execute(['fuel', 'notifications', '-a'])
self.assertEqual(mrequests.get.call_count, 1)
self.assertEqual(get.call_count, 1)
notifications = mformat_table.call_args[0][0]
self.assertEqual(len(notifications), 2)
self.assertListEqual(notifications, m.json.return_value)
self.assertListEqual(notifications, test_notifications)

View File

@ -16,15 +16,16 @@
import mock
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
class TestParser(base.UnitTestCase):
def test_choose_only_one_format(self):
with mock.patch('sys.stderr') as mstderr:
with self.assertRaises(SystemExit):
self.execute(['fuel', '--json', '--yaml'])
self.assertRaises(SystemExit,
self.execute,
['fuel', '--json', '--yaml'])
args, _ = mstderr.write.call_args
self.assertRegexpMatches(
args[0],

View File

@ -22,14 +22,14 @@ import time
import mock
import pytest
import requests
import requests_mock as rm
from six import moves as six_moves
from fuelclient import client
from fuelclient import fuelclient_settings
from fuelclient.objects import node
from fuelclient import profiler
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
from fuelclient.tests import utils
@ -77,18 +77,10 @@ class ClientPerfTest(base.UnitTestCase):
def setUp(self):
super(ClientPerfTest, self).setUp()
req_patcher = mock.patch.object(requests.api, 'request')
token_patcher = mock.patch.object(client.Client, 'auth_token',
new_callable=mock.PropertyMock)
self.mock_request = req_patcher.start()
self.mock_auth_token = token_patcher.start()
def tearDown(self):
super(ClientPerfTest, self).tearDown()
self.mock_request.stop()
self.mock_auth_token.stop()
self.addCleanup(self.mock_auth_token.stop)
@classmethod
def get_random_nodes(cls, number):
@ -108,13 +100,17 @@ class ClientPerfTest(base.UnitTestCase):
m_responses = []
for resp in responses:
m_resp = requests.models.Response()
m_resp.encoding = 'utf8'
m_resp._content = resp
m_resp = {'text': resp, 'status': 200}
m_responses.append(m_resp)
self.mock_request.side_effect = m_responses
self.m_request.stop()
self.m_request = rm.Mocker()
self.top_matcher = self.m_request.register_uri(rm.ANY,
rm.ANY,
m_responses)
self.addCleanup(self.m_request.stop)
def test_list_nodes(self):
nodes_text = json.dumps(self.nodes)

View File

@ -16,13 +16,12 @@
from mock import patch
from fuelclient.tests import base
from fuelclient.cli.actions import PluginAction
from fuelclient.cli import error
from fuelclient.cli.formatting import format_table
from fuelclient.cli.serializers import Serializer
from fuelclient.objects.plugins import Plugins
from fuelclient.tests.unit.v1 import base
plugin_data = {

View File

@ -21,7 +21,7 @@ from fuelclient.cli import error
from fuelclient.objects.plugins import Plugins
from fuelclient.objects.plugins import PluginV1
from fuelclient.objects.plugins import PluginV2
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
@patch('fuelclient.objects.plugins.raise_error_if_not_master')

View File

@ -14,34 +14,29 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
from mock import patch
import requests_mock as rm
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
API_INPUT = {'config': 'nova_network'}
API_OUTPUT = 'config: nova_network\n'
@patch('fuelclient.client.requests')
@patch('fuelclient.cli.serializers.open', create=True)
@patch('fuelclient.cli.actions.base.os')
class TestReleaseNetworkActions(base.UnitTestCase):
def test_release_network_download(self, mos, mopen, mrequests):
mrequests.get().json.return_value = API_INPUT
def test_release_network_download(self, mos, mopen):
self.m_request.get(rm.ANY, json=API_INPUT)
self.execute(['fuel', 'rel', '--rel', '1', '--network', '--download'])
mopen().__enter__().write.assert_called_once_with(API_OUTPUT)
def test_release_network_upload(self, mos, mopen, mrequests):
def test_release_network_upload(self, mos, mopen):
mopen().__enter__().read.return_value = API_OUTPUT
put = self.m_request.put('/api/v1/releases/1/networks', json={})
self.execute(['fuel', 'rel', '--rel', '1', '--network', '--upload'])
self.assertEqual(mrequests.put.call_count, 1)
call_args = mrequests.put.call_args_list[0]
url = call_args[0][0]
kwargs = call_args[1]
self.assertIn('releases/1/networks', url)
self.assertEqual(
json.loads(kwargs['data']), API_INPUT)
self.assertTrue(put.called)
self.assertEqual(put.last_request.json(), API_INPUT)

View File

@ -13,11 +13,10 @@
# under the License.
from mock import patch
import requests_mock
import yaml
from fuelclient.cli.serializers import Serializer
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
API_IN = """name: my_role
"""
@ -25,15 +24,14 @@ API_IN = """name: my_role
API_OUT = yaml.load(API_IN)
@requests_mock.mock()
class TestRoleActions(base.UnitTestCase):
release_id = 2
def test_list_roles(self, mreq):
def test_list_roles(self):
url = '/api/v1/releases/{0}/roles/'.format(self.release_id)
cmd = 'fuel role --rel {0}'.format(self.release_id)
get_request = mreq.get(url, json=[API_OUT])
get_request = self.m_request.get(url, json=[API_OUT])
self.execute(cmd.split())
@ -41,11 +39,11 @@ class TestRoleActions(base.UnitTestCase):
self.assertIn(url, get_request.last_request.url)
@patch('fuelclient.cli.serializers.open', create=True)
def test_get_role(self, mreq, mopen):
def test_get_role(self, mopen):
url = '/api/v1/releases/{0}/roles/my_role/'.format(self.release_id)
cmd = 'fuel role --role my_role --file myfile.yaml --rel {0}'.format(
self.release_id)
get_request = mreq.get(url, json=API_OUT)
get_request = self.m_request.get(url, json=API_OUT)
self.execute(cmd.split())
@ -54,12 +52,12 @@ class TestRoleActions(base.UnitTestCase):
self.assertIn(url, get_request.last_request.url)
@patch('fuelclient.cli.serializers.open', create=True)
def test_create_role(self, mreq, mopen):
def test_create_role(self, mopen):
url = '/api/v1/releases/{0}/roles/'.format(self.release_id)
cmd = 'fuel role --create --file myfile.yaml --rel {0}'.format(
self.release_id)
mopen().__enter__().read.return_value = API_IN
post_request = mreq.post(url, json=API_OUT)
post_request = self.m_request.post(url, json=API_OUT)
self.execute(cmd.split())
@ -69,12 +67,12 @@ class TestRoleActions(base.UnitTestCase):
API_OUT, post_request.last_request.json())
@patch('fuelclient.cli.serializers.open', create=True)
def test_update_role(self, mreq, mopen):
def test_update_role(self, mopen):
url = '/api/v1/releases/{0}/roles/my_role/'.format(self.release_id)
cmd = 'fuel role --update --file myfile.yaml --rel {0}'.format(
self.release_id)
mopen().__enter__().read.return_value = API_IN
put_request = mreq.put(url, json=API_OUT)
put_request = self.m_request.put(url, json=API_OUT)
self.execute(cmd.split())
@ -83,21 +81,21 @@ class TestRoleActions(base.UnitTestCase):
self.assertEqual(
API_OUT, put_request.last_request.json())
def test_delete_role(self, mreq):
def test_delete_role(self):
url = '/api/v1/releases/{0}/roles/my_role/'.format(self.release_id)
cmd = 'fuel role --delete --role my_role --rel {0}'.format(
self.release_id)
delete_request = mreq.delete(url, json=API_OUT)
delete_request = self.m_request.delete(url, json=API_OUT)
self.execute(cmd.split())
self.assertTrue(delete_request.called)
self.assertIn(url, delete_request.last_request.url)
def test_formatting_for_list_roles(self, mreq):
def test_formatting_for_list_roles(self):
url = '/api/v1/releases/{0}/roles/'.format(self.release_id)
cmd = 'fuel role --rel {0}'.format(self.release_id)
get_request = mreq.get(url, json=[API_OUT])
get_request = self.m_request.get(url, json=[API_OUT])
with patch.object(Serializer, 'print_to_output') as mock_print:
with patch('fuelclient.cli.actions.role.format_table') \

View File

@ -16,7 +16,7 @@ import mock
from mock import call
from mock import patch
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
class TestSnapshot(base.UnitTestCase):

View File

@ -18,14 +18,13 @@ import io
import mock
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
@mock.patch('fuelclient.client.requests')
class TestPluginsActions(base.UnitTestCase):
@mock.patch('fuelclient.cli.actions.token.APIClient')
def test_token_action(self, mAPIClient, mrequests):
def test_token_action(self, mAPIClient):
with mock.patch('sys.stdout', new=io.StringIO()) as mstdout:
token = u'token123'
mauth_token = mock.PropertyMock(return_value=token)

View File

@ -16,7 +16,7 @@ import mock
from fuelclient.cli.actions.user import UserAction
from fuelclient.cli.error import ArgumentException
from fuelclient.tests import base
from fuelclient.tests.unit.v1 import base
class TestChangePassword(base.UnitTestCase):

View File

@ -19,14 +19,19 @@ import sys
import mock
try:
from unittest.case import TestCase
except ImportError:
# Runing unit-tests in production environment all
from unittest2.case import TestCase
import fuelclient
from fuelclient.cli import error
from fuelclient.commands import environment as env
from fuelclient import main as main_mod
from fuelclient.tests import base
class BaseCLITest(base.UnitTestCase):
class BaseCLITest(TestCase):
"""Base class for testing the new CLI
It mocks the whole API layer in order to be sure the
@ -37,14 +42,14 @@ class BaseCLITest(base.UnitTestCase):
"""
def setUp(self):
super(BaseCLITest, self).setUp()
self._get_client_patcher = mock.patch.object(fuelclient, 'get_client')
self.m_get_client = self._get_client_patcher.start()
self.m_client = mock.MagicMock()
self.m_get_client.return_value = self.m_client
def tearDown(self):
self._get_client_patcher.stop()
self.addCleanup(self._get_client_patcher.stop)
def exec_command(self, command=''):
"""Executes fuelclient with the specified arguments."""

View File

@ -19,12 +19,19 @@ from mock import patch
import requests_mock as rm
from six.moves.urllib import parse as urlparse
try:
from unittest.case import TestCase
except ImportError:
# Runing unit-tests in production environment all
from unittest2.case import TestCase
from fuelclient import client
from fuelclient.tests import base
class BaseLibTest(base.UnitTestCase):
class BaseLibTest(TestCase):
def setUp(self):
super(BaseLibTest, self).setUp()
self.m_request = rm.Mocker()
self.m_request.start()
@ -49,5 +56,7 @@ class BaseLibTest(base.UnitTestCase):
self.assertFalse(self.top_matcher.called, msg=msg)
super(BaseLibTest, self).tearDown()
def get_object_uri(self, collection_path, object_id, attribute='/'):
return urlparse.urljoin(collection_path, str(object_id) + attribute)