http/https check rules as stevedore extensions

Why? HttpCheck/HttpsCheck are examples of rule checks
that can be implemented outside of the oslo.policy
library. Once we setup the infra for registering and
using these as stevedore extensions, we automatically
get the capability of other folks contributing to
writing custom rules for their own use cases.

* Add HttpCheck/HttpsCheck as entrypoints in setup.cfg
* parser will check get_extensions() to see if there
  are any external checks registered
* Move HttpCheck/HttpsCheck into external module
* Move related test cases to test_external.py

Change-Id: Icde2b26a38d7c7842defae053228d9208454b969
This commit is contained in:
Davanum Srinivas 2017-09-25 09:25:53 -04:00
parent 89d226916c
commit 7e185ad96b
7 changed files with 445 additions and 403 deletions

View File

@ -17,19 +17,25 @@
import abc
import ast
import contextlib
import copy
import inspect
import os
from oslo_serialization import jsonutils
import requests
import six
from oslo_policy._i18n import _
import stevedore
registered_checks = {}
extension_checks = None
def get_extensions():
global extension_checks
if extension_checks is None:
em = stevedore.ExtensionManager('oslo.policy.rule_checks',
invoke_on_load=False)
extension_checks = {
extension.name: extension.plugin
for extension in em
}
return extension_checks
def _check(rule, target, creds, enforcer, current_rule):
@ -276,107 +282,6 @@ class RoleCheck(Check):
return False
@register('http')
class HttpCheck(Check):
"""Check ``http:`` rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly ``True``.
"""
def __call__(self, target, creds, enforcer, current_rule=None):
url = ('http:' + self.match) % target
# Convert instances of object() in target temporarily to
# empty dict to avoid circular reference detection
# errors in jsonutils.dumps().
temp_target = copy.deepcopy(target)
for key in target.keys():
element = target.get(key)
if type(element) is object:
temp_target[key] = {}
data = json = None
if (enforcer.conf.oslo_policy.remote_content_type ==
'application/x-www-form-urlencoded'):
data = {'rule': jsonutils.dumps(current_rule),
'target': jsonutils.dumps(temp_target),
'credentials': jsonutils.dumps(creds)}
else:
json = {'rule': current_rule,
'target': temp_target,
'credentials': creds}
with contextlib.closing(requests.post(url, json=json, data=data)) as r:
return r.text.lstrip('"').rstrip('"') == 'True'
@register('https')
class HttpsCheck(Check):
"""Check ``https:`` rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly ``True``.
"""
def __call__(self, target, creds, enforcer, current_rule=None):
url = ('https:' + self.match) % target
cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file
key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file
ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file
verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt
if cert_file:
if not os.path.exists(cert_file):
raise RuntimeError(
_("Unable to find ssl cert_file : %s") % cert_file)
if not os.access(cert_file, os.R_OK):
raise RuntimeError(
_("Unable to access ssl cert_file : %s") % cert_file)
if key_file:
if not os.path.exists(key_file):
raise RuntimeError(
_("Unable to find ssl key_file : %s") % key_file)
if not os.access(key_file, os.R_OK):
raise RuntimeError(
_("Unable to access ssl key_file : %s") % key_file)
cert = (cert_file, key_file)
if verify_server:
if ca_crt_file:
if not os.path.exists(ca_crt_file):
raise RuntimeError(
_("Unable to find ca cert_file : %s") % ca_crt_file)
verify_server = ca_crt_file
# Convert instances of object() in target temporarily to
# empty dict to avoid circular reference detection
# errors in jsonutils.dumps().
temp_target = copy.deepcopy(target)
for key in target.keys():
element = target.get(key)
if type(element) is object:
temp_target[key] = {}
data = json = None
if (enforcer.conf.oslo_policy.remote_content_type ==
'application/x-www-form-urlencoded'):
data = {'rule': jsonutils.dumps(current_rule),
'target': jsonutils.dumps(temp_target),
'credentials': jsonutils.dumps(creds)}
else:
json = {'rule': current_rule,
'target': temp_target,
'credentials': creds}
with contextlib.closing(
requests.post(url, json=json,
data=data, cert=cert,
verify=verify_server)
) as r:
return r.text.lstrip('"').rstrip('"') == 'True'
@register(None)
class GenericCheck(Check):
"""Check an individual match.

111
oslo_policy/_external.py Normal file
View File

@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import copy
import os
from oslo_policy import _checks
from oslo_policy._i18n import _
from oslo_serialization import jsonutils
import requests
class HttpCheck(_checks.Check):
"""Check ``http:`` rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly ``True``.
"""
def __call__(self, target, creds, enforcer, current_rule=None):
url = ('http:' + self.match) % target
data, json = self._construct_payload(creds, current_rule,
enforcer, target)
with contextlib.closing(
requests.post(url, json=json, data=data)
) as r:
return r.text.lstrip('"').rstrip('"') == 'True'
@staticmethod
def _construct_payload(creds, current_rule, enforcer, target):
# Convert instances of object() in target temporarily to
# empty dict to avoid circular reference detection
# errors in jsonutils.dumps().
temp_target = copy.deepcopy(target)
for key in target.keys():
element = target.get(key)
if type(element) is object:
temp_target[key] = {}
data = json = None
if (enforcer.conf.oslo_policy.remote_content_type ==
'application/x-www-form-urlencoded'):
data = {'rule': jsonutils.dumps(current_rule),
'target': jsonutils.dumps(temp_target),
'credentials': jsonutils.dumps(creds)}
else:
json = {'rule': current_rule,
'target': temp_target,
'credentials': creds}
return data, json
class HttpsCheck(HttpCheck):
"""Check ``https:`` rules by calling to a remote server.
This example implementation simply verifies that the response
is exactly ``True``.
"""
def __call__(self, target, creds, enforcer, current_rule=None):
url = ('https:' + self.match) % target
cert_file = enforcer.conf.oslo_policy.remote_ssl_client_crt_file
key_file = enforcer.conf.oslo_policy.remote_ssl_client_key_file
ca_crt_file = enforcer.conf.oslo_policy.remote_ssl_ca_crt_file
verify_server = enforcer.conf.oslo_policy.remote_ssl_verify_server_crt
if cert_file:
if not os.path.exists(cert_file):
raise RuntimeError(
_("Unable to find ssl cert_file : %s") % cert_file)
if not os.access(cert_file, os.R_OK):
raise RuntimeError(
_("Unable to access ssl cert_file : %s") % cert_file)
if key_file:
if not os.path.exists(key_file):
raise RuntimeError(
_("Unable to find ssl key_file : %s") % key_file)
if not os.access(key_file, os.R_OK):
raise RuntimeError(
_("Unable to access ssl key_file : %s") % key_file)
cert = (cert_file, key_file)
if verify_server:
if ca_crt_file:
if not os.path.exists(ca_crt_file):
raise RuntimeError(
_("Unable to find ca cert_file : %s") % ca_crt_file)
verify_server = ca_crt_file
data, json = self._construct_payload(creds, current_rule,
enforcer, target)
with contextlib.closing(
requests.post(url, json=json,
data=data, cert=cert,
verify=verify_server)
) as r:
return r.text.lstrip('"').rstrip('"') == 'True'

View File

@ -216,7 +216,10 @@ def _parse_check(rule):
return _checks.FalseCheck()
# Find what implements the check
if kind in _checks.registered_checks:
extension_checks = _checks.get_extensions()
if kind in extension_checks:
return extension_checks[kind](kind, match)
elif kind in _checks.registered_checks:
return _checks.registered_checks[kind](kind, match)
elif None in _checks.registered_checks:
return _checks.registered_checks[None](kind, match)

View File

@ -36,7 +36,7 @@ class HttpCheckFixture(fixtures.Fixture):
self.useFixture(
fixtures.MonkeyPatch(
'oslo_policy._checks.HttpCheck.__call__',
'oslo_policy._external.HttpCheck.__call__',
mocked_call,
)
)
@ -63,7 +63,7 @@ class HttpsCheckFixture(fixtures.Fixture):
self.useFixture(
fixtures.MonkeyPatch(
'oslo_policy._checks.HttpCheck.__call__',
'oslo_policy._external.HttpsCheck.__call__',
mocked_call,
)
)

View File

@ -13,15 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
from oslo_serialization import jsonutils
from oslotest import base as test_base
from requests_mock.contrib import fixture as rm_fixture
import six.moves.urllib.parse as urlparse
from oslo_policy import _checks
from oslo_policy import opts
from oslo_policy.tests import base
from oslo_policy.tests import token_fixture
@ -96,292 +91,6 @@ class RoleCheckTestCase(base.PolicyBaseTestCase):
self.assertFalse(check({}, {}, self.enforcer))
class HttpCheckTestCase(base.PolicyBaseTestCase):
def setUp(self):
super(HttpCheckTestCase, self).setUp()
opts._register(self.conf)
self.requests_mock = self.useFixture(rm_fixture.Fixture())
def decode_post_data(self, post_data):
result = {}
for item in post_data.split('&'):
key, _sep, value = item.partition('=')
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
return result
def test_accept(self):
self.requests_mock.post('http://example.com/target', text='True')
check = _checks.HttpCheck('http', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('application/x-www-form-urlencoded',
last_request.headers['Content-Type'])
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
rule=None),
self.decode_post_data(last_request.body))
def test_accept_json(self):
self.conf.set_override('remote_content_type', 'application/json',
group='oslo_policy')
self.requests_mock.post('http://example.com/target', text='True')
check = _checks.HttpCheck('http', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('application/json',
last_request.headers['Content-Type'])
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
credentials=cred_dict,
target=target_dict),
json.loads(last_request.body.decode('utf-8')))
def test_reject(self):
self.requests_mock.post("http://example.com/target", text='other')
check = _checks.HttpCheck('http', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
rule=None),
self.decode_post_data(last_request.body))
def test_http_with_objects_in_target(self):
self.requests_mock.post("http://example.com/target", text='True')
check = _checks.HttpCheck('http', '//example.com/%(name)s')
target = {'a': object(),
'name': 'target',
'b': 'test data'}
self.assertTrue(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer))
def test_http_with_strings_in_target(self):
self.requests_mock.post("http://example.com/target", text='True')
check = _checks.HttpCheck('http', '//example.com/%(name)s')
target = {'a': 'some_string',
'name': 'target',
'b': 'test data'}
self.assertTrue(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer))
def test_accept_with_rule_in_argument(self):
self.requests_mock.post('http://example.com/target', text='True')
check = _checks.HttpCheck('http', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
current_rule = "a_rule"
self.assertTrue(check(target_dict, cred_dict, self.enforcer,
current_rule))
last_request = self.requests_mock.last_request
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
rule=current_rule),
self.decode_post_data(last_request.body))
def test_reject_with_rule_in_argument(self):
self.requests_mock.post("http://example.com/target", text='other')
check = _checks.HttpCheck('http', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
current_rule = "a_rule"
self.assertFalse(check(target_dict, cred_dict, self.enforcer,
current_rule))
last_request = self.requests_mock.last_request
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
rule=current_rule),
self.decode_post_data(last_request.body))
class HttpsCheckTestCase(base.PolicyBaseTestCase):
def setUp(self):
super(HttpsCheckTestCase, self).setUp()
opts._register(self.conf)
self.requests_mock = self.useFixture(rm_fixture.Fixture())
def decode_post_data(self, post_data):
result = {}
for item in post_data.split('&'):
key, _sep, value = item.partition('=')
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
return result
def test_https_accept(self):
self.requests_mock.post('https://example.com/target', text='True')
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('application/x-www-form-urlencoded',
last_request.headers['Content-Type'])
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
self.decode_post_data(last_request.body))
def test_https_accept_json(self):
self.conf.set_override('remote_content_type', 'application/json',
group='oslo_policy')
self.requests_mock.post('https://example.com/target', text='True')
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('application/json',
last_request.headers['Content-Type'])
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
json.loads(last_request.body.decode('utf-8')))
def test_https_accept_with_verify(self):
self.conf.set_override('remote_ssl_verify_server_crt', True,
group='oslo_policy')
self.requests_mock.post('https://example.com/target', text='True')
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual(True, last_request.verify)
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
self.decode_post_data(last_request.body))
def test_https_accept_with_verify_cert(self):
self.conf.set_override('remote_ssl_verify_server_crt', True,
group='oslo_policy')
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
group='oslo_policy')
self.requests_mock.post('https://example.com/target', text='True')
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
with mock.patch('os.path.exists') as path_exists:
path_exists.return_value = True
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('ca.crt', last_request.verify)
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
self.decode_post_data(last_request.body))
def test_https_accept_with_verify_and_client_certs(self):
self.conf.set_override('remote_ssl_verify_server_crt', True,
group='oslo_policy')
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
group='oslo_policy')
self.conf.set_override('remote_ssl_client_key_file', "client.key",
group='oslo_policy')
self.conf.set_override('remote_ssl_client_crt_file', "client.crt",
group='oslo_policy')
self.requests_mock.post('https://example.com/target', text='True')
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
with mock.patch('os.path.exists') as path_exists:
with mock.patch('os.access') as os_access:
path_exists.return_value = True
os_access.return_value = True
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('ca.crt', last_request.verify)
self.assertEqual(('client.crt', 'client.key'), last_request.cert)
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
self.decode_post_data(last_request.body))
def test_https_reject(self):
self.requests_mock.post("https://example.com/target", text='other')
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
self.decode_post_data(last_request.body))
def test_https_with_objects_in_target(self):
self.requests_mock.post("https://example.com/target", text='True')
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
target = {'a': object(),
'name': 'target',
'b': 'test data'}
self.assertTrue(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer))
def test_https_with_strings_in_target(self):
self.requests_mock.post("https://example.com/target", text='True')
check = _checks.HttpsCheck('https', '//example.com/%(name)s')
target = {'a': 'some_string',
'name': 'target',
'b': 'test data'}
self.assertTrue(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer))
class GenericCheckTestCase(base.PolicyBaseTestCase):
def test_no_cred(self):
check = _checks.GenericCheck('name', '%(name)s')

View File

@ -0,0 +1,310 @@
# Copyright (c) 2015 OpenStack Foundation.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import mock
from oslo_serialization import jsonutils
from requests_mock.contrib import fixture as rm_fixture
import six.moves.urllib.parse as urlparse
from oslo_policy import _external
from oslo_policy import opts
from oslo_policy.tests import base
class HttpCheckTestCase(base.PolicyBaseTestCase):
def setUp(self):
super(HttpCheckTestCase, self).setUp()
opts._register(self.conf)
self.requests_mock = self.useFixture(rm_fixture.Fixture())
def decode_post_data(self, post_data):
result = {}
for item in post_data.split('&'):
key, _sep, value = item.partition('=')
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
return result
def test_accept(self):
self.requests_mock.post('http://example.com/target', text='True')
check = _external.HttpCheck('http', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('application/x-www-form-urlencoded',
last_request.headers['Content-Type'])
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
rule=None),
self.decode_post_data(last_request.body))
def test_accept_json(self):
self.conf.set_override('remote_content_type', 'application/json',
group='oslo_policy')
self.requests_mock.post('http://example.com/target', text='True')
check = _external.HttpCheck('http', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('application/json',
last_request.headers['Content-Type'])
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
credentials=cred_dict,
target=target_dict),
json.loads(last_request.body.decode('utf-8')))
def test_reject(self):
self.requests_mock.post("http://example.com/target", text='other')
check = _external.HttpCheck('http', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
rule=None),
self.decode_post_data(last_request.body))
def test_http_with_objects_in_target(self):
self.requests_mock.post("http://example.com/target", text='True')
check = _external.HttpCheck('http', '//example.com/%(name)s')
target = {'a': object(),
'name': 'target',
'b': 'test data'}
self.assertTrue(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer))
def test_http_with_strings_in_target(self):
self.requests_mock.post("http://example.com/target", text='True')
check = _external.HttpCheck('http', '//example.com/%(name)s')
target = {'a': 'some_string',
'name': 'target',
'b': 'test data'}
self.assertTrue(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer))
def test_accept_with_rule_in_argument(self):
self.requests_mock.post('http://example.com/target', text='True')
check = _external.HttpCheck('http', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
current_rule = "a_rule"
self.assertTrue(check(target_dict, cred_dict, self.enforcer,
current_rule))
last_request = self.requests_mock.last_request
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
rule=current_rule),
self.decode_post_data(last_request.body))
def test_reject_with_rule_in_argument(self):
self.requests_mock.post("http://example.com/target", text='other')
check = _external.HttpCheck('http', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
current_rule = "a_rule"
self.assertFalse(check(target_dict, cred_dict, self.enforcer,
current_rule))
last_request = self.requests_mock.last_request
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(target=target_dict, credentials=cred_dict,
rule=current_rule),
self.decode_post_data(last_request.body))
class HttpsCheckTestCase(base.PolicyBaseTestCase):
def setUp(self):
super(HttpsCheckTestCase, self).setUp()
opts._register(self.conf)
self.requests_mock = self.useFixture(rm_fixture.Fixture())
def decode_post_data(self, post_data):
result = {}
for item in post_data.split('&'):
key, _sep, value = item.partition('=')
result[key] = jsonutils.loads(urlparse.unquote_plus(value))
return result
def test_https_accept(self):
self.requests_mock.post('https://example.com/target', text='True')
check = _external.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('application/x-www-form-urlencoded',
last_request.headers['Content-Type'])
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
self.decode_post_data(last_request.body))
def test_https_accept_json(self):
self.conf.set_override('remote_content_type', 'application/json',
group='oslo_policy')
self.requests_mock.post('https://example.com/target', text='True')
check = _external.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('application/json',
last_request.headers['Content-Type'])
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
json.loads(last_request.body.decode('utf-8')))
def test_https_accept_with_verify(self):
self.conf.set_override('remote_ssl_verify_server_crt', True,
group='oslo_policy')
self.requests_mock.post('https://example.com/target', text='True')
check = _external.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual(True, last_request.verify)
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
self.decode_post_data(last_request.body))
def test_https_accept_with_verify_cert(self):
self.conf.set_override('remote_ssl_verify_server_crt', True,
group='oslo_policy')
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
group='oslo_policy')
self.requests_mock.post('https://example.com/target', text='True')
check = _external.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
with mock.patch('os.path.exists') as path_exists:
path_exists.return_value = True
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('ca.crt', last_request.verify)
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
self.decode_post_data(last_request.body))
def test_https_accept_with_verify_and_client_certs(self):
self.conf.set_override('remote_ssl_verify_server_crt', True,
group='oslo_policy')
self.conf.set_override('remote_ssl_ca_crt_file', "ca.crt",
group='oslo_policy')
self.conf.set_override('remote_ssl_client_key_file', "client.key",
group='oslo_policy')
self.conf.set_override('remote_ssl_client_crt_file', "client.crt",
group='oslo_policy')
self.requests_mock.post('https://example.com/target', text='True')
check = _external.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
with mock.patch('os.path.exists') as path_exists:
with mock.patch('os.access') as os_access:
path_exists.return_value = True
os_access.return_value = True
self.assertTrue(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('ca.crt', last_request.verify)
self.assertEqual(('client.crt', 'client.key'), last_request.cert)
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
self.decode_post_data(last_request.body))
def test_https_reject(self):
self.requests_mock.post("https://example.com/target", text='other')
check = _external.HttpsCheck('https', '//example.com/%(name)s')
target_dict = dict(name='target', spam='spammer')
cred_dict = dict(user='user', roles=['a', 'b', 'c'])
self.assertFalse(check(target_dict, cred_dict, self.enforcer))
last_request = self.requests_mock.last_request
self.assertEqual('POST', last_request.method)
self.assertEqual(dict(rule=None,
target=target_dict,
credentials=cred_dict),
self.decode_post_data(last_request.body))
def test_https_with_objects_in_target(self):
self.requests_mock.post("https://example.com/target", text='True')
check = _external.HttpsCheck('https', '//example.com/%(name)s')
target = {'a': object(),
'name': 'target',
'b': 'test data'}
self.assertTrue(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer))
def test_https_with_strings_in_target(self):
self.requests_mock.post("https://example.com/target", text='True')
check = _external.HttpsCheck('https', '//example.com/%(name)s')
target = {'a': 'some_string',
'name': 'target',
'b': 'test data'}
self.assertTrue(check(target,
dict(user='user', roles=['a', 'b', 'c']),
self.enforcer))

View File

@ -39,6 +39,10 @@ console_scripts =
oslopolicy-policy-generator = oslo_policy.generator:generate_policy
oslopolicy-list-redundant = oslo_policy.generator:list_redundant
oslo.policy.rule_checks =
http = oslo_policy._external:HttpCheck
https = oslo_policy._external:HttpsCheck
[build_sphinx]
all-files = 1
warning-is-error = 1