Use requests-mock instead of custom fakes

Mocking out the entire http layer means there is no testing being
performed that a request is actually being sent out as expected.
requests-mock mocks requests and responses at the point where it would
be sent out over the wire so we can better see actual made requests.

Change-Id: I66657ecc6a53b23cecfe92c911aee6fd27c68f40
This commit is contained in:
Jamie Lennox 2016-09-12 21:10:31 +10:00 committed by Kaiyan Sheng
parent 7664f93d00
commit d50f05a60b
3 changed files with 124 additions and 155 deletions

View File

@ -14,7 +14,6 @@
# limitations under the License. # limitations under the License.
from keystoneclient.v3 import client as ksclient from keystoneclient.v3 import client as ksclient
from oslo_serialization import jsonutils
def script_keystone_client(token=None): def script_keystone_client(token=None):
@ -33,13 +32,6 @@ def script_keystone_client(token=None):
'abcd1234', 'test')) 'abcd1234', 'test'))
def fake_headers():
return {'X-Auth-Token': 'abcd1234',
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': 'python-monascaclient'}
class FakeServiceCatalog(object): class FakeServiceCatalog(object):
def url_for(self, endpoint_type, service_type): def url_for(self, endpoint_type, service_type):
@ -52,36 +44,3 @@ class FakeKeystone(object):
def __init__(self, auth_token, project_id): def __init__(self, auth_token, project_id):
self.auth_token = auth_token self.auth_token = auth_token
self.project_id = project_id self.project_id = project_id
class FakeRaw(object):
version = 110
class FakeHTTPResponse(object):
version = 1.1
def __init__(self, status_code=None, reason=None, headers=None, content=None):
self.headers = headers
self.content = content
self.status_code = status_code
self.reason = reason
self.raw = FakeRaw()
def getheader(self, name, default=None):
return self.headers.get(name, default)
def getheaders(self):
return self.headers.items()
def read(self, amt=None):
b = self.content
self.content = None
return b
def iter_content(self, chunksize):
return self.content
def json(self):
return jsonutils.loads(self.content)

View File

@ -19,10 +19,10 @@ import sys
import fixtures import fixtures
from keystoneclient.v3 import client as ksclient from keystoneclient.v3 import client as ksclient
from mox3 import mox from mox3 import mox
from requests_mock.contrib import fixture as requests_mock_fixture
import six import six
import testtools import testtools
from monascaclient.common import http
from monascaclient import exc from monascaclient import exc
import monascaclient.shell import monascaclient.shell
from monascaclient.tests import fakes from monascaclient.tests import fakes
@ -69,10 +69,10 @@ class ShellBase(TestCase):
def setUp(self): def setUp(self):
super(ShellBase, self).setUp() super(ShellBase, self).setUp()
self.requests_mock = self.useFixture(requests_mock_fixture.Fixture())
self.m = mox.Mox() self.m = mox.Mox()
self.m.StubOutWithMock(ksclient, 'Client') self.m.StubOutWithMock(ksclient, 'Client')
self.m.StubOutWithMock(http.HTTPClient, 'json_request')
self.m.StubOutWithMock(http.HTTPClient, 'raw_request')
self.addCleanup(self.m.VerifyAll) self.addCleanup(self.m.VerifyAll)
self.addCleanup(self.m.UnsetStubs) self.addCleanup(self.m.UnsetStubs)
@ -150,6 +150,18 @@ class ShellTestMonascaCommands(ShellBase):
super(ShellTestMonascaCommands, self).setUp() super(ShellTestMonascaCommands, self).setUp()
self._set_fake_env() self._set_fake_env()
def assertHeaders(self, req=None, **kwargs):
if not req:
req = self.requests_mock.last_request
self.assertEqual('password', req.headers['X-Auth-Key'])
self.assertEqual('username', req.headers['X-Auth-User'])
self.assertEqual('abcd1234', req.headers['X-Auth-Token'])
self.assertEqual('python-monascaclient', req.headers['User-Agent'])
for k, v in kwargs.items():
self.assertEqual(v, req.headers[k])
def _set_fake_env(self): def _set_fake_env(self):
fake_env = { fake_env = {
'OS_USERNAME': 'username', 'OS_USERNAME': 'username',
@ -174,24 +186,13 @@ class ShellTestMonascaCommands(ShellBase):
def test_good_metrics_create_subcommand(self): def test_good_metrics_create_subcommand(self):
self._script_keystone_client() self._script_keystone_client()
resp = fakes.FakeHTTPResponse(
204,
'Created',
{'location': 'http://no.where/v2.0/metrics'},
None)
http.HTTPClient.json_request(
'POST',
'/metrics',
data={'timestamp': 1395691090,
'name': 'metric1',
'value': 123.0},
headers={'X-Auth-Key': 'password',
'X-Auth-User': 'username'}).AndReturn((resp,
None))
self.m.ReplayAll() self.m.ReplayAll()
headers = {'location': 'http://no.where/v2.0/metrics'}
self.requests_mock.post('http://192.168.1.5:8004/v1/f14b41234/metrics',
status_code=204,
headers=headers)
argstrings = [ argstrings = [
'metric-create metric1 123 --time 1395691090', 'metric-create metric1 123 --time 1395691090',
] ]
@ -199,6 +200,13 @@ class ShellTestMonascaCommands(ShellBase):
retvalue = self.shell(argstr) retvalue = self.shell(argstr)
self.assertRegexpMatches(retvalue, "^Success") self.assertRegexpMatches(retvalue, "^Success")
data = {'timestamp': 1395691090,
'name': 'metric1',
'value': 123.0}
self.assertHeaders()
self.assertEqual(data, self.requests_mock.last_request.json())
def test_bad_notifications_create_missing_args_subcommand(self): def test_bad_notifications_create_missing_args_subcommand(self):
argstrings = [ argstrings = [
'notification-create email1 metric1@hp.com', 'notification-create email1 metric1@hp.com',
@ -209,23 +217,16 @@ class ShellTestMonascaCommands(ShellBase):
def test_good_notifications_create_subcommand(self): def test_good_notifications_create_subcommand(self):
self._script_keystone_client() self._script_keystone_client()
resp = fakes.FakeHTTPResponse(
201,
'Created',
{'location': 'http://no.where/v2.0/notification-methods'},
None)
http.HTTPClient.json_request(
'POST',
'/notification-methods',
data={'name': 'email1',
'type': 'EMAIL',
'address': 'john.doe@hp.com'},
headers={'X-Auth-Key': 'password',
'X-Auth-User': 'username'}).AndReturn((resp, 'id'))
self.m.ReplayAll() self.m.ReplayAll()
url = 'http://192.168.1.5:8004/v1/f14b41234/notification-methods'
headers = {'location': 'http://no.where/v2.0/notification-methods',
'Content-Type': 'application/json'}
self.requests_mock.post(url,
status_code=201,
headers=headers,
json='id')
argstrings = [ argstrings = [
'notification-create email1 EMAIL john.doe@hp.com', 'notification-create email1 EMAIL john.doe@hp.com',
] ]
@ -233,25 +234,25 @@ class ShellTestMonascaCommands(ShellBase):
retvalue = self.shell(argstr) retvalue = self.shell(argstr)
self.assertRegexpMatches(retvalue, "id") self.assertRegexpMatches(retvalue, "id")
data = {'name': 'email1',
'type': 'EMAIL',
'address': 'john.doe@hp.com'}
self.assertHeaders()
self.assertEqual(data, self.requests_mock.last_request.json())
def test_good_notifications_create_subcommand_webhook(self): def test_good_notifications_create_subcommand_webhook(self):
self._script_keystone_client() self._script_keystone_client()
resp = fakes.FakeHTTPResponse(
201,
'Created',
{'location': 'http://no.where/v2.0/notification-methods'},
None)
http.HTTPClient.json_request(
'POST',
'/notification-methods',
data={'name': 'mypost',
'type': 'WEBHOOK',
'address': 'http://localhost:8080'},
headers={'X-Auth-Key': 'password',
'X-Auth-User': 'username'}).AndReturn((resp, 'id'))
self.m.ReplayAll() self.m.ReplayAll()
url = 'http://192.168.1.5:8004/v1/f14b41234/notification-methods'
headers = {'location': 'http://no.where/v2.0/notification-methods',
'Content-Type': 'application/json'}
self.requests_mock.post(url,
status_code=201,
headers=headers,
json='id')
argstrings = [ argstrings = [
'notification-create mypost WEBHOOK http://localhost:8080', 'notification-create mypost WEBHOOK http://localhost:8080',
] ]
@ -259,71 +260,92 @@ class ShellTestMonascaCommands(ShellBase):
retvalue = self.shell(argstr) retvalue = self.shell(argstr)
self.assertRegexpMatches(retvalue, "id") self.assertRegexpMatches(retvalue, "id")
data = {'name': 'mypost',
'type': 'WEBHOOK',
'address': 'http://localhost:8080'}
self.assertHeaders()
self.assertEqual(data, self.requests_mock.last_request.json())
def test_good_notifications_patch(self): def test_good_notifications_patch(self):
self._script_keystone_client() self._script_keystone_client()
self.m.ReplayAll()
id_str = '0495340b-58fd-4e1c-932b-5e6f9cc96490' id_str = '0495340b-58fd-4e1c-932b-5e6f9cc96490'
resp = fakes.FakeHTTPResponse( url = 'http://192.168.1.5:8004/v1/f14b41234/notification-methods/'
201, headers = {'location': 'http://no.where/v2.0/notification-methods',
'Created', 'Content-Type': 'application/json'}
{'location': 'http://no.where/v2.0/notification-methods'},
None) self.requests_mock.patch(url + id_str,
http.HTTPClient.json_request( status_code=201,
'PATCH', headers=headers,
'/notification-methods/' + id_str, json='id')
data={'type': 'EMAIL',
'address': 'john.doe@hpe.com',
'period': 0},
headers={'X-Auth-Key': 'password',
'X-Auth-User': 'username'}).AndReturn((resp, 'id'))
self.m.ReplayAll()
argstring = 'notification-patch {0} --type EMAIL --address' \ argstring = 'notification-patch {0} --type EMAIL --address' \
' john.doe@hpe.com --period 0'.format(id_str) ' john.doe@hpe.com --period 0'.format(id_str)
retvalue = self.shell(argstring) retvalue = self.shell(argstring)
self.assertRegexpMatches(retvalue, "id") self.assertRegexpMatches(retvalue, "id")
data = {'type': 'EMAIL',
'address': 'john.doe@hpe.com',
'period': 0}
self.assertHeaders()
self.assertEqual(data, self.requests_mock.last_request.json())
def test_bad_notifications_patch(self): def test_bad_notifications_patch(self):
self._script_keystone_client() self._script_keystone_client()
self.m.ReplayAll()
id_str = '0495340b-58fd-4e1c-932b-5e6f9cc96490' id_str = '0495340b-58fd-4e1c-932b-5e6f9cc96490'
argstring = 'notification-patch {0} --type EMAIL --address' \ argstring = 'notification-patch {0} --type EMAIL --address' \
' john.doe@hpe.com --period 60'.format(id_str) ' john.doe@hpe.com --period 60'.format(id_str)
self.m.ReplayAll()
retvalue = self.shell(argstring) retvalue = self.shell(argstring)
self.assertRegexpMatches(retvalue, "^Invalid") self.assertRegexpMatches(retvalue, "^Invalid")
def test_good_notifications_update(self): def test_good_notifications_update(self):
self._script_keystone_client() self._script_keystone_client()
self.m.ReplayAll()
id_str = '0495340b-58fd-4e1c-932b-5e6f9cc96491' id_str = '0495340b-58fd-4e1c-932b-5e6f9cc96491'
resp = fakes.FakeHTTPResponse( url = 'http://192.168.1.5:8004/v1/f14b41234/notification-methods/'
201, headers = {'location': 'http://no.where/v2.0/notification-methods',
'Created', 'Content-Type': 'application/json'}
{'location': 'http://no.where/v2.0/notification-methods'},
None) self.requests_mock.put(url + id_str,
http.HTTPClient.json_request( status_code=201,
'PUT', headers=headers,
'/notification-methods/' + id_str, json='id')
data={'name': 'notification_updated_name',
'type': 'EMAIL',
'address': 'john.doe@hpe.com',
'period': 0},
headers={'X-Auth-Key': 'password',
'X-Auth-User': 'username'}).AndReturn((resp, 'id'))
self.m.ReplayAll()
argstring = 'notification-update {0} notification_updated_name ' \ argstring = 'notification-update {0} notification_updated_name ' \
'EMAIL john.doe@hpe.com 0'.format(id_str) 'EMAIL john.doe@hpe.com 0'.format(id_str)
retvalue = self.shell(argstring) retvalue = self.shell(argstring)
self.assertRegexpMatches(retvalue, "id") self.assertRegexpMatches(retvalue, "id")
data = {'name': 'notification_updated_name',
'type': 'EMAIL',
'address': 'john.doe@hpe.com',
'period': 0}
self.assertHeaders()
self.assertEqual(data, self.requests_mock.last_request.json())
def test_good_alarm_definition_update(self): def test_good_alarm_definition_update(self):
self._script_keystone_client() self._script_keystone_client()
self.m.ReplayAll()
id_str = '0495340b-58fd-4e1c-932b-5e6f9cc96490'
url = 'http://192.168.1.5:8004/v1/f14b41234/alarm-definitions/'
headers = {'location': 'http://no.where/v2.0/notification-methods',
'Content-Type': 'application/json'}
self.requests_mock.put(url + id_str,
status_code=201,
headers=headers,
json='id')
cmd = 'alarm-definition-update' cmd = 'alarm-definition-update'
id = '0495340b-58fd-4e1c-932b-5e6f9cc96490'
name = 'alarm_name' name = 'alarm_name'
description = 'test_alarm_definition' description = 'test_alarm_definition'
expression = 'avg(Test_Metric_1)>=10' expression = 'avg(Test_Metric_1)>=10'
@ -331,51 +353,38 @@ class ShellTestMonascaCommands(ShellBase):
enabled = 'True' enabled = 'True'
match_by = 'hostname' match_by = 'hostname'
severity = 'CRITICAL' severity = 'CRITICAL'
resp = fakes.FakeHTTPResponse(
201,
'Created',
{'location': 'http://no.where/v2.0/notification-methods'},
None)
http.HTTPClient.json_request(
'PUT',
'/alarm-definitions/' + id,
data={'name': name,
'description': description,
'expression': expression,
'alarm_actions': [notif_id],
'undetermined_actions': [notif_id],
'ok_actions': [notif_id],
'match_by': [match_by],
'actions_enabled': bool(enabled),
'severity': severity
},
headers={'X-Auth-Key': 'password',
'X-Auth-User': 'username'}).AndReturn((resp, 'id'))
self.m.ReplayAll() args = [cmd, id_str, name, description, expression, notif_id,
args = [cmd, id, name, description, expression, notif_id,
notif_id, notif_id, enabled, match_by, severity] notif_id, notif_id, enabled, match_by, severity]
argstring = " ".join(args) argstring = " ".join(args)
retvalue = self.shell(argstring) retvalue = self.shell(argstring)
self.assertRegexpMatches(retvalue, "id") self.assertRegexpMatches(retvalue, "id")
data = {'name': name,
'description': description,
'expression': expression,
'alarm_actions': [notif_id],
'undetermined_actions': [notif_id],
'ok_actions': [notif_id],
'match_by': [match_by],
'actions_enabled': bool(enabled),
'severity': severity}
self.assertHeaders()
self.assertEqual(data, self.requests_mock.last_request.json())
def test_notifications_types_list(self): def test_notifications_types_list(self):
self._script_keystone_client() self._script_keystone_client()
resp_body = [{"type": "WEBHOOK"}, {"type": "EMAIL"}, {"type": "PAGERDUTY"}]
resp = fakes.FakeHTTPResponse(
status_code=200,
content=resp_body)
http.HTTPClient.json_request(
'GET',
'/notification-methods/types',
headers={'X-Auth-Key': 'password',
'X-Auth-User': 'username'}).AndReturn(((resp, resp_body)))
self.m.ReplayAll() self.m.ReplayAll()
url = 'http://192.168.1.5:8004/v1/f14b41234/notification-methods/'
headers = {'Content-Type': 'application/json'}
body = [{"type": "WEBHOOK"}, {"type": "EMAIL"}, {"type": "PAGERDUTY"}]
self.requests_mock.get(url + 'types', headers=headers, json=body)
argstrings = ["notification-type-list"] argstrings = ["notification-type-list"]
retvalue = self.shell("".join(argstrings)) retvalue = self.shell("".join(argstrings))
self.assertRegexpMatches(retvalue, "types") self.assertRegexpMatches(retvalue, "types")
self.assertHeaders()

View File

@ -6,6 +6,7 @@ fixtures>=3.0.0 # Apache-2.0/BSD
hacking<0.12,>=0.11.0 # Apache-2.0 hacking<0.12,>=0.11.0 # Apache-2.0
mock>=2.0 # BSD mock>=2.0 # BSD
mox3>=0.7.0 # Apache-2.0 mox3>=0.7.0 # Apache-2.0
requests-mock>=1.1 # Apache-2.0
sphinx!=1.3b1,<1.4,>=1.2.1 # BSD sphinx!=1.3b1,<1.4,>=1.2.1 # BSD
testrepository>=0.0.18 # Apache-2.0/BSD testrepository>=0.0.18 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD testscenarios>=0.4 # Apache-2.0/BSD