Add RBAC enforcement to L7 policies v2 API
This patch adds policies and enforcement to the Octavia v2 API for L7 policies. Change-Id: Ie4de79df3f6f7a6c46c00d2e224979a8e25e9712 Partial-Bug: #1690481
This commit is contained in:
parent
aea4f266ee
commit
accf9456cc
|
@ -47,6 +47,13 @@ class L7PolicyController(base.BaseController):
|
|||
"""Gets a single l7policy's details."""
|
||||
context = pecan.request.context.get('octavia_context')
|
||||
db_l7policy = self._get_db_l7policy(context.session, id)
|
||||
|
||||
# Check that the user is authorized to show this l7policy
|
||||
action = '{rbac_obj}{action}'.format(
|
||||
rbac_obj=constants.RBAC_L7POLICY, action='get_one')
|
||||
target = {'project_id': db_l7policy.project_id}
|
||||
context.policy.authorize(action, target)
|
||||
|
||||
result = self._convert_db_to_type(db_l7policy,
|
||||
l7policy_types.L7PolicyResponse)
|
||||
return l7policy_types.L7PolicyRootResponse(l7policy=result)
|
||||
|
@ -57,17 +64,31 @@ class L7PolicyController(base.BaseController):
|
|||
"""Lists all l7policies of a listener."""
|
||||
pcontext = pecan.request.context
|
||||
context = pcontext.get('octavia_context')
|
||||
if context.is_admin or CONF.auth_strategy == constants.NOAUTH:
|
||||
if project_id:
|
||||
project_id = {'project_id': project_id}
|
||||
else:
|
||||
project_id = {}
|
||||
|
||||
# Check if user is authorized to list l7policies under all projects
|
||||
action = '{rbac_obj}{action}'.format(
|
||||
rbac_obj=constants.RBAC_L7POLICY, action='get_all-global')
|
||||
target = {'project_id': project_id}
|
||||
if not context.policy.authorize(action, target, do_raise=False):
|
||||
# Not a global observer or admin
|
||||
if project_id is None:
|
||||
project_id = context.project_id
|
||||
|
||||
# Check if user is authorized to list l7policies under this project
|
||||
action = '{rbac_obj}{action}'.format(
|
||||
rbac_obj=constants.RBAC_L7POLICY, action='get_all')
|
||||
target = {'project_id': project_id}
|
||||
context.policy.authorize(action, target)
|
||||
|
||||
if project_id is None:
|
||||
query_filter = {}
|
||||
else:
|
||||
project_id = {'project_id': context.project_id}
|
||||
query_filter = {'project_id': project_id}
|
||||
|
||||
db_l7policies, links = self.repositories.l7policy.get_all(
|
||||
context.session, show_deleted=False,
|
||||
pagination_helper=pcontext.get(constants.PAGINATION_HELPER),
|
||||
**project_id)
|
||||
**query_filter)
|
||||
result = self._convert_db_to_type(
|
||||
db_l7policies, [l7policy_types.L7PolicyResponse])
|
||||
return l7policy_types.L7PoliciesRootResponse(
|
||||
|
@ -150,6 +171,12 @@ class L7PolicyController(base.BaseController):
|
|||
load_balancer_id = listener.load_balancer_id
|
||||
l7policy.project_id = listener.project_id
|
||||
|
||||
# Check that the user is authorized to create under this project
|
||||
action = '{rbac_obj}{action}'.format(
|
||||
rbac_obj=constants.RBAC_L7POLICY, action='post')
|
||||
target = {'project_id': l7policy.project_id}
|
||||
context.policy.authorize(action, target)
|
||||
|
||||
lock_session = db_api.get_session(autocommit=False)
|
||||
if self.repositories.check_quota_met(
|
||||
context.session,
|
||||
|
@ -214,6 +241,13 @@ class L7PolicyController(base.BaseController):
|
|||
db_l7policy = self._get_db_l7policy(context.session, id)
|
||||
load_balancer_id, listener_id = self._get_listener_and_loadbalancer_id(
|
||||
db_l7policy)
|
||||
|
||||
# Check that the user is authorized to update this l7policy
|
||||
action = '{rbac_obj}{action}'.format(
|
||||
rbac_obj=constants.RBAC_L7POLICY, action='put')
|
||||
target = {'project_id': db_l7policy.project_id}
|
||||
context.policy.authorize(action, target)
|
||||
|
||||
self._test_lb_and_listener_statuses(context.session,
|
||||
lb_id=load_balancer_id,
|
||||
listener_ids=[listener_id])
|
||||
|
@ -247,6 +281,13 @@ class L7PolicyController(base.BaseController):
|
|||
db_l7policy = self._get_db_l7policy(context.session, id)
|
||||
load_balancer_id, listener_id = self._get_listener_and_loadbalancer_id(
|
||||
db_l7policy)
|
||||
|
||||
# Check that the user is authorized to delete this pool
|
||||
action = '{rbac_obj}{action}'.format(
|
||||
rbac_obj=constants.RBAC_L7POLICY, action='delete')
|
||||
target = {'project_id': db_l7policy.project_id}
|
||||
context.policy.authorize(action, target)
|
||||
|
||||
self._test_lb_and_listener_statuses(context.session,
|
||||
lb_id=load_balancer_id,
|
||||
listener_ids=[listener_id])
|
||||
|
|
|
@ -433,3 +433,4 @@ RBAC_LISTENER = '{}:listener:'.format(LOADBALANCER_API)
|
|||
RBAC_POOL = '{}:pool:'.format(LOADBALANCER_API)
|
||||
RBAC_MEMBER = '{}:member:'.format(LOADBALANCER_API)
|
||||
RBAC_HEALTHMONITOR = '{}:healthmonitor:'.format(LOADBALANCER_API)
|
||||
RBAC_L7POLICY = '{}:l7policy:'.format(LOADBALANCER_API)
|
||||
|
|
|
@ -15,6 +15,7 @@ import itertools
|
|||
|
||||
from octavia.policies import base
|
||||
from octavia.policies import healthmonitor
|
||||
from octavia.policies import l7policy
|
||||
from octavia.policies import listener
|
||||
from octavia.policies import loadbalancer
|
||||
from octavia.policies import member
|
||||
|
@ -25,6 +26,7 @@ def list_rules():
|
|||
return itertools.chain(
|
||||
base.list_rules(),
|
||||
healthmonitor.list_rules(),
|
||||
l7policy.list_rules(),
|
||||
listener.list_rules(),
|
||||
loadbalancer.list_rules(),
|
||||
member.list_rules(),
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
# Copyright 2017 Rackspace, US Inc.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from octavia.common import constants
|
||||
from oslo_policy import policy
|
||||
|
||||
rules = [
|
||||
policy.DocumentedRuleDefault(
|
||||
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7POLICY,
|
||||
action='get_all'),
|
||||
constants.RULE_API_READ,
|
||||
"List L7 Policys",
|
||||
[{'method': 'GET', 'path': '/v2.0/lbaas/l7policies'}]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7POLICY,
|
||||
action='get_all-global'),
|
||||
constants.RULE_API_READ_GLOBAL,
|
||||
"List L7 Policys including resources owned by others",
|
||||
[{'method': 'GET', 'path': '/v2.0/lbaas/l7policies'}]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7POLICY,
|
||||
action='post'),
|
||||
constants.RULE_API_WRITE,
|
||||
"Create a L7 Policy",
|
||||
[{'method': 'POST', 'path': '/v2.0/lbaas/l7policies'}]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7POLICY,
|
||||
action='get_one'),
|
||||
constants.RULE_API_READ,
|
||||
"Show L7 Policy details",
|
||||
[{'method': 'GET',
|
||||
'path': '/v2.0/lbaas/l7policies/{l7policy_id}'}]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7POLICY,
|
||||
action='put'),
|
||||
constants.RULE_API_WRITE,
|
||||
"Update a L7 Policy",
|
||||
[{'method': 'PUT',
|
||||
'path': '/v2.0/lbaas/l7policies/{l7policy_id}'}]
|
||||
),
|
||||
policy.DocumentedRuleDefault(
|
||||
'{rbac_obj}{action}'.format(rbac_obj=constants.RBAC_L7POLICY,
|
||||
action='delete'),
|
||||
constants.RULE_API_WRITE,
|
||||
"Remove a L7 Policy",
|
||||
[{'method': 'DELETE',
|
||||
'path': '/v2.0/lbaas/l7policies/{l7policy_id}'}]
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def list_rules():
|
||||
return rules
|
|
@ -251,12 +251,13 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
|
|||
response = self.post(path, body, **status)
|
||||
return response.json
|
||||
|
||||
def create_l7policy(self, listener_id, action, **optionals):
|
||||
def create_l7policy(self, listener_id, action, status=None, **optionals):
|
||||
req_dict = {'listener_id': listener_id, 'action': action}
|
||||
req_dict.update(optionals)
|
||||
body = {'l7policy': req_dict}
|
||||
path = self.L7POLICIES_PATH
|
||||
response = self.post(path, body)
|
||||
status = {'status': status} if status else {}
|
||||
response = self.post(path, body, **status)
|
||||
return response.json
|
||||
|
||||
def create_l7rule(self, l7policy_id, type, compare_type,
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_config import fixture as oslo_fixture
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from octavia.common import constants
|
||||
|
@ -32,6 +34,7 @@ class TestL7Policy(base.BaseAPITest):
|
|||
super(TestL7Policy, self).setUp()
|
||||
self.lb = self.create_load_balancer(uuidutils.generate_uuid())
|
||||
self.lb_id = self.lb.get('loadbalancer').get('id')
|
||||
self.project_id = self.lb.get('loadbalancer').get('project_id')
|
||||
self.set_lb_status(self.lb_id)
|
||||
self.listener = self.create_listener(
|
||||
constants.PROTOCOL_HTTP, 80, lb_id=self.lb_id)
|
||||
|
@ -52,6 +55,53 @@ class TestL7Policy(base.BaseAPITest):
|
|||
l7policy_id=api_l7policy.get('id'))).json.get(self.root_tag)
|
||||
self.assertEqual(api_l7policy, response)
|
||||
|
||||
def test_get_authorized(self):
|
||||
api_l7policy = self.create_l7policy(
|
||||
self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT).get(self.root_tag)
|
||||
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
auth_strategy = self.conf.conf.get('auth_strategy')
|
||||
self.conf.config(auth_strategy=constants.TESTING)
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
self.project_id):
|
||||
override_credentials = {
|
||||
'service_user_id': None,
|
||||
'user_domain_id': None,
|
||||
'is_admin_project': True,
|
||||
'service_project_domain_id': None,
|
||||
'service_project_id': None,
|
||||
'roles': ['load-balancer_member'],
|
||||
'user_id': None,
|
||||
'is_admin': False,
|
||||
'service_user_domain_id': None,
|
||||
'project_domain_id': None,
|
||||
'service_roles': [],
|
||||
'project_id': self.project_id}
|
||||
with mock.patch(
|
||||
"oslo_context.context.RequestContext.to_policy_values",
|
||||
return_value=override_credentials):
|
||||
response = self.get(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id')))
|
||||
response = response.json.get(self.root_tag)
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
self.assertEqual(api_l7policy, response)
|
||||
|
||||
def test_get_not_authorized(self):
|
||||
api_l7policy = self.create_l7policy(
|
||||
self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT).get(self.root_tag)
|
||||
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
auth_strategy = self.conf.conf.get('auth_strategy')
|
||||
self.conf.config(auth_strategy=constants.TESTING)
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
uuidutils.generate_uuid()):
|
||||
response = self.get(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id')), status=401)
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
|
||||
|
||||
def test_get_hides_deleted(self):
|
||||
api_l7policy = self.create_l7policy(
|
||||
self.listener_id,
|
||||
|
@ -147,8 +197,24 @@ class TestL7Policy(base.BaseAPITest):
|
|||
self.conf.config(auth_strategy=constants.KEYSTONE)
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
api_l7p_c.get('project_id')):
|
||||
policies = self.get(
|
||||
self.L7POLICIES_PATH).json.get(self.root_tag_list)
|
||||
override_credentials = {
|
||||
'service_user_id': None,
|
||||
'user_domain_id': None,
|
||||
'is_admin_project': True,
|
||||
'service_project_domain_id': None,
|
||||
'service_project_id': None,
|
||||
'roles': ['load-balancer_member'],
|
||||
'user_id': None,
|
||||
'is_admin': False,
|
||||
'service_user_domain_id': None,
|
||||
'project_domain_id': None,
|
||||
'service_roles': [],
|
||||
'project_id': self.project_id}
|
||||
with mock.patch(
|
||||
"oslo_context.context.RequestContext.to_policy_values",
|
||||
return_value=override_credentials):
|
||||
policies = self.get(
|
||||
self.L7POLICIES_PATH).json.get(self.root_tag_list)
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
|
||||
self.assertEqual(1, len(policies))
|
||||
|
@ -156,6 +222,81 @@ class TestL7Policy(base.BaseAPITest):
|
|||
self.assertIn((api_l7p_c.get('id'), api_l7p_c.get('action')),
|
||||
policy_id_actions)
|
||||
|
||||
def test_get_all_non_admin_global_observer(self):
|
||||
project_id = uuidutils.generate_uuid()
|
||||
lb1 = self.create_load_balancer(uuidutils.generate_uuid(), name='lb1',
|
||||
project_id=project_id)
|
||||
lb1_id = lb1.get('loadbalancer').get('id')
|
||||
self.set_lb_status(lb1_id)
|
||||
listener1 = self.create_listener(constants.PROTOCOL_HTTP, 80,
|
||||
lb1_id)
|
||||
listener1_id = listener1.get('listener').get('id')
|
||||
self.set_lb_status(lb1_id)
|
||||
pool1 = self.create_pool(lb1_id, constants.PROTOCOL_HTTP,
|
||||
constants.LB_ALGORITHM_ROUND_ROBIN)
|
||||
pool1_id = pool1.get('pool').get('id')
|
||||
self.set_lb_status(lb1_id)
|
||||
api_l7p_a = self.create_l7policy(
|
||||
listener1_id,
|
||||
constants.L7POLICY_ACTION_REJECT).get(self.root_tag)
|
||||
self.set_lb_status(lb1_id)
|
||||
api_l7p_b = self.create_l7policy(
|
||||
listener1_id, constants.L7POLICY_ACTION_REDIRECT_TO_POOL,
|
||||
position=2, redirect_pool_id=pool1_id).get(self.root_tag)
|
||||
self.set_lb_status(lb1_id)
|
||||
api_l7p_c = self.create_l7policy(
|
||||
self.listener_id, constants.L7POLICY_ACTION_REDIRECT_TO_URL,
|
||||
redirect_url='http://localhost/').get(self.root_tag)
|
||||
self.set_lb_status(lb1_id)
|
||||
|
||||
auth_strategy = self.conf.conf.get('auth_strategy')
|
||||
self.conf.config(auth_strategy=constants.KEYSTONE)
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
api_l7p_c.get('project_id')):
|
||||
override_credentials = {
|
||||
'service_user_id': None,
|
||||
'user_domain_id': None,
|
||||
'is_admin_project': True,
|
||||
'service_project_domain_id': None,
|
||||
'service_project_id': None,
|
||||
'roles': ['load-balancer_global_observer'],
|
||||
'user_id': None,
|
||||
'is_admin': False,
|
||||
'service_user_domain_id': None,
|
||||
'project_domain_id': None,
|
||||
'service_roles': [],
|
||||
'project_id': self.project_id}
|
||||
with mock.patch(
|
||||
"oslo_context.context.RequestContext.to_policy_values",
|
||||
return_value=override_credentials):
|
||||
policies = self.get(
|
||||
self.L7POLICIES_PATH).json.get(self.root_tag_list)
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
|
||||
self.assertEqual(3, len(policies))
|
||||
policy_id_actions = [(p.get('id'), p.get('action')) for p in policies]
|
||||
self.assertIn((api_l7p_a.get('id'), api_l7p_a.get('action')),
|
||||
policy_id_actions)
|
||||
self.assertIn((api_l7p_b.get('id'), api_l7p_b.get('action')),
|
||||
policy_id_actions)
|
||||
self.assertIn((api_l7p_c.get('id'), api_l7p_c.get('action')),
|
||||
policy_id_actions)
|
||||
|
||||
def test_get_all_not_authorized(self):
|
||||
self.create_l7policy(self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT,
|
||||
).get(self.root_tag)
|
||||
self.set_lb_status(self.lb_id)
|
||||
|
||||
auth_strategy = self.conf.conf.get('auth_strategy')
|
||||
self.conf.config(auth_strategy=constants.TESTING)
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
uuidutils.generate_uuid()):
|
||||
policies = self.get(self.L7POLICIES_PATH, status=401).json
|
||||
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
self.assertEqual(self.NOT_AUTHORIZED_BODY, policies)
|
||||
|
||||
def test_get_by_project_id(self):
|
||||
project1_id = uuidutils.generate_uuid()
|
||||
project2_id = uuidutils.generate_uuid()
|
||||
|
@ -309,6 +450,62 @@ class TestL7Policy(base.BaseAPITest):
|
|||
l7policy_prov_status=constants.PENDING_CREATE,
|
||||
l7policy_op_status=constants.OFFLINE)
|
||||
|
||||
def test_create_policy_authorized(self):
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
auth_strategy = self.conf.conf.get('auth_strategy')
|
||||
self.conf.config(auth_strategy=constants.TESTING)
|
||||
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
self.project_id):
|
||||
override_credentials = {
|
||||
'service_user_id': None,
|
||||
'user_domain_id': None,
|
||||
'is_admin_project': True,
|
||||
'service_project_domain_id': None,
|
||||
'service_project_id': None,
|
||||
'roles': ['load-balancer_member'],
|
||||
'user_id': None,
|
||||
'is_admin': False,
|
||||
'service_user_domain_id': None,
|
||||
'project_domain_id': None,
|
||||
'service_roles': [],
|
||||
'project_id': self.project_id}
|
||||
with mock.patch(
|
||||
"oslo_context.context.RequestContext.to_policy_values",
|
||||
return_value=override_credentials):
|
||||
api_l7policy = self.create_l7policy(
|
||||
self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT).get(self.root_tag)
|
||||
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
self.assertEqual(constants.L7POLICY_ACTION_REJECT,
|
||||
api_l7policy['action'])
|
||||
self.assertEqual(1, api_l7policy['position'])
|
||||
self.assertIsNone(api_l7policy['redirect_pool_id'])
|
||||
self.assertIsNone(api_l7policy['redirect_url'])
|
||||
self.assertTrue(api_l7policy['admin_state_up'])
|
||||
self.assert_correct_status(
|
||||
lb_id=self.lb_id, listener_id=self.listener_id,
|
||||
l7policy_id=api_l7policy.get('id'),
|
||||
lb_prov_status=constants.PENDING_UPDATE,
|
||||
listener_prov_status=constants.PENDING_UPDATE,
|
||||
l7policy_prov_status=constants.PENDING_CREATE,
|
||||
l7policy_op_status=constants.OFFLINE)
|
||||
|
||||
def test_create_policy_not_authorized(self):
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
auth_strategy = self.conf.conf.get('auth_strategy')
|
||||
self.conf.config(auth_strategy=constants.TESTING)
|
||||
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
self.project_id):
|
||||
api_l7policy = self.create_l7policy(
|
||||
self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT, status=401)
|
||||
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
self.assertEqual(self.NOT_AUTHORIZED_BODY, api_l7policy)
|
||||
|
||||
def test_create_redirect_to_pool(self):
|
||||
api_l7policy = self.create_l7policy(
|
||||
self.listener_id, constants.L7POLICY_ACTION_REDIRECT_TO_POOL,
|
||||
|
@ -402,6 +599,77 @@ class TestL7Policy(base.BaseAPITest):
|
|||
listener_prov_status=constants.PENDING_UPDATE,
|
||||
l7policy_prov_status=constants.PENDING_UPDATE)
|
||||
|
||||
def test_update_authorized(self):
|
||||
api_l7policy = self.create_l7policy(self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT,
|
||||
).get(self.root_tag)
|
||||
self.set_lb_status(self.lb_id)
|
||||
new_l7policy = {
|
||||
'action': constants.L7POLICY_ACTION_REDIRECT_TO_URL,
|
||||
'redirect_url': 'http://www.example.com'}
|
||||
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
auth_strategy = self.conf.conf.get('auth_strategy')
|
||||
self.conf.config(auth_strategy=constants.TESTING)
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
self.project_id):
|
||||
override_credentials = {
|
||||
'service_user_id': None,
|
||||
'user_domain_id': None,
|
||||
'is_admin_project': True,
|
||||
'service_project_domain_id': None,
|
||||
'service_project_id': None,
|
||||
'roles': ['load-balancer_member'],
|
||||
'user_id': None,
|
||||
'is_admin': False,
|
||||
'service_user_domain_id': None,
|
||||
'project_domain_id': None,
|
||||
'service_roles': [],
|
||||
'project_id': self.project_id}
|
||||
with mock.patch(
|
||||
"oslo_context.context.RequestContext.to_policy_values",
|
||||
return_value=override_credentials):
|
||||
response = self.put(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id')),
|
||||
self._build_body(new_l7policy)).json.get(self.root_tag)
|
||||
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
self.assertEqual(constants.L7POLICY_ACTION_REJECT,
|
||||
response.get('action'))
|
||||
self.assert_correct_status(
|
||||
lb_id=self.lb_id, listener_id=self.listener_id,
|
||||
l7policy_id=api_l7policy.get('id'),
|
||||
lb_prov_status=constants.PENDING_UPDATE,
|
||||
listener_prov_status=constants.PENDING_UPDATE,
|
||||
l7policy_prov_status=constants.PENDING_UPDATE)
|
||||
|
||||
def test_update_not_authorized(self):
|
||||
api_l7policy = self.create_l7policy(self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT,
|
||||
).get(self.root_tag)
|
||||
self.set_lb_status(self.lb_id)
|
||||
new_l7policy = {
|
||||
'action': constants.L7POLICY_ACTION_REDIRECT_TO_URL,
|
||||
'redirect_url': 'http://www.example.com'}
|
||||
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
auth_strategy = self.conf.conf.get('auth_strategy')
|
||||
self.conf.config(auth_strategy=constants.TESTING)
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
self.project_id):
|
||||
response = self.put(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id')),
|
||||
self._build_body(new_l7policy), status=401)
|
||||
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
self.assertEqual(self.NOT_AUTHORIZED_BODY, response.json)
|
||||
self.assert_correct_status(
|
||||
lb_id=self.lb_id, listener_id=self.listener_id,
|
||||
l7policy_id=api_l7policy.get('id'),
|
||||
lb_prov_status=constants.ACTIVE,
|
||||
listener_prov_status=constants.ACTIVE,
|
||||
l7policy_prov_status=constants.ACTIVE)
|
||||
|
||||
def test_bad_update(self):
|
||||
api_l7policy = self.create_l7policy(self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT,
|
||||
|
@ -514,6 +782,85 @@ class TestL7Policy(base.BaseAPITest):
|
|||
listener_prov_status=constants.PENDING_UPDATE,
|
||||
l7policy_prov_status=constants.PENDING_DELETE)
|
||||
|
||||
def test_delete_authorized(self):
|
||||
api_l7policy = self.create_l7policy(
|
||||
self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT).get(self.root_tag)
|
||||
self.set_lb_status(self.lb_id)
|
||||
# Set status to ACTIVE/ONLINE because set_lb_status did it in the db
|
||||
api_l7policy['provisioning_status'] = constants.ACTIVE
|
||||
api_l7policy['operating_status'] = constants.ONLINE
|
||||
api_l7policy.pop('updated_at')
|
||||
|
||||
response = self.get(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id'))).json.get(self.root_tag)
|
||||
response.pop('updated_at')
|
||||
self.assertEqual(api_l7policy, response)
|
||||
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
auth_strategy = self.conf.conf.get('auth_strategy')
|
||||
self.conf.config(auth_strategy=constants.TESTING)
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
self.project_id):
|
||||
override_credentials = {
|
||||
'service_user_id': None,
|
||||
'user_domain_id': None,
|
||||
'is_admin_project': True,
|
||||
'service_project_domain_id': None,
|
||||
'service_project_id': None,
|
||||
'roles': ['load-balancer_member'],
|
||||
'user_id': None,
|
||||
'is_admin': False,
|
||||
'service_user_domain_id': None,
|
||||
'project_domain_id': None,
|
||||
'service_roles': [],
|
||||
'project_id': self.project_id}
|
||||
with mock.patch(
|
||||
"oslo_context.context.RequestContext.to_policy_values",
|
||||
return_value=override_credentials):
|
||||
|
||||
self.delete(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id')))
|
||||
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
self.assert_correct_status(
|
||||
lb_id=self.lb_id, listener_id=self.listener_id,
|
||||
l7policy_id=api_l7policy.get('id'),
|
||||
lb_prov_status=constants.PENDING_UPDATE,
|
||||
listener_prov_status=constants.PENDING_UPDATE,
|
||||
l7policy_prov_status=constants.PENDING_DELETE)
|
||||
|
||||
def test_delete_not_authorized(self):
|
||||
api_l7policy = self.create_l7policy(
|
||||
self.listener_id,
|
||||
constants.L7POLICY_ACTION_REJECT).get(self.root_tag)
|
||||
self.set_lb_status(self.lb_id)
|
||||
# Set status to ACTIVE/ONLINE because set_lb_status did it in the db
|
||||
api_l7policy['provisioning_status'] = constants.ACTIVE
|
||||
api_l7policy['operating_status'] = constants.ONLINE
|
||||
api_l7policy.pop('updated_at')
|
||||
|
||||
response = self.get(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id'))).json.get(self.root_tag)
|
||||
response.pop('updated_at')
|
||||
self.assertEqual(api_l7policy, response)
|
||||
|
||||
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||
auth_strategy = self.conf.conf.get('auth_strategy')
|
||||
self.conf.config(auth_strategy=constants.TESTING)
|
||||
with mock.patch.object(octavia.common.context.Context, 'project_id',
|
||||
uuidutils.generate_uuid()):
|
||||
self.delete(self.L7POLICY_PATH.format(
|
||||
l7policy_id=api_l7policy.get('id')), status=401)
|
||||
|
||||
self.conf.config(auth_strategy=auth_strategy)
|
||||
self.assert_correct_status(
|
||||
lb_id=self.lb_id, listener_id=self.listener_id,
|
||||
l7policy_id=api_l7policy.get('id'),
|
||||
lb_prov_status=constants.ACTIVE,
|
||||
listener_prov_status=constants.ACTIVE,
|
||||
l7policy_prov_status=constants.ACTIVE)
|
||||
|
||||
def test_bad_delete(self):
|
||||
self.delete(self.L7POLICY_PATH.format(
|
||||
l7policy_id=uuidutils.generate_uuid()), status=404)
|
||||
|
|
Loading…
Reference in New Issue