policy: cleanup deprecation code to handle old extension:xxx rules

It served and warned users for enough time (since Icehouse) to be sure
everyone was notified about the need to update their policy file.

Change-Id: I240b935741e49fbf65c0b95715af04af4b2a73e7
This commit is contained in:
Ihar Hrachyshka 2015-04-23 14:03:52 +02:00
parent 734e77365b
commit 66fece4f84
2 changed files with 1 additions and 77 deletions

View File

@ -18,7 +18,6 @@ Policy engine for neutron. Largely copied from nova.
"""
import collections
import itertools
import logging as std_logging
import re
@ -30,7 +29,7 @@ import six
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions
from neutron.i18n import _LE, _LI, _LW
from neutron.i18n import _LE, _LW
from neutron.openstack.common import policy
@ -39,22 +38,6 @@ LOG = logging.getLogger(__name__)
_ENFORCER = None
ADMIN_CTX_POLICY = 'context_is_admin'
ADVSVC_CTX_POLICY = 'context_is_advsvc'
# Maps deprecated 'extension' policies to new-style policies
DEPRECATED_POLICY_MAP = {
'extension:provider_network':
['network:provider:network_type',
'network:provider:physical_network',
'network:provider:segmentation_id'],
'extension:router':
['network:router:external'],
'extension:port_binding':
['port:binding:vif_type', 'port:binding:vif_details',
'port:binding:profile', 'port:binding:host_id']
}
DEPRECATED_ACTION_MAP = {
'view': ['get'],
'set': ['create', 'update']
}
def reset():
@ -95,35 +78,6 @@ def set_rules(policies, overwrite=True):
"""
LOG.debug("Loading policies from file: %s", _ENFORCER.policy_path)
# Ensure backward compatibility with folsom/grizzly convention
# for extension rules
for pol in policies.keys():
if any([pol.startswith(depr_pol) for depr_pol in
DEPRECATED_POLICY_MAP.keys()]):
LOG.warn(_LW("Found deprecated policy rule:%s. Please consider "
"upgrading your policy configuration file"), pol)
pol_name, action = pol.rsplit(':', 1)
try:
new_actions = DEPRECATED_ACTION_MAP[action]
new_policies = DEPRECATED_POLICY_MAP[pol_name]
# bind new actions and policies together
for actual_policy in ['_'.join(item) for item in
itertools.product(new_actions,
new_policies)]:
if actual_policy not in policies:
# New policy, same rule
LOG.info(_LI("Inserting policy:%(new_policy)s in "
"place of deprecated "
"policy:%(old_policy)s"),
{'new_policy': actual_policy,
'old_policy': pol})
policies[actual_policy] = policies[pol]
# Remove old-style policy
del policies[pol]
except KeyError:
LOG.error(_LE("Backward compatibility unavailable for "
"deprecated policy %s. The policy will "
"not be enforced"), pol)
init()
_ENFORCER.set_rules(policies, overwrite)

View File

@ -562,36 +562,6 @@ class NeutronPolicyTestCase(base.BaseTestCase):
def test_enforce_tenant_id_check_invalid_parent_resource_raises(self):
self._test_enforce_tenant_id_raises('tenant_id:%(foobaz_tenant_id)s')
def _test_set_rules_with_deprecated_policy(self, input_rules,
expected_rules):
policy.set_rules(input_rules.copy())
# verify deprecated policy has been removed
for pol in input_rules.keys():
self.assertNotIn(pol, policy._ENFORCER.rules)
# verify deprecated policy was correctly translated. Iterate
# over items for compatibility with unittest2 in python 2.6
for rule in expected_rules:
self.assertIn(rule, policy._ENFORCER.rules)
self.assertEqual(str(policy._ENFORCER.rules[rule]),
expected_rules[rule])
def test_set_rules_with_deprecated_view_policy(self):
self._test_set_rules_with_deprecated_policy(
{'extension:router:view': 'rule:admin_or_owner'},
{'get_network:router:external': 'rule:admin_or_owner'})
def test_set_rules_with_deprecated_set_policy(self):
expected_policies = ['create_network:provider:network_type',
'create_network:provider:physical_network',
'create_network:provider:segmentation_id',
'update_network:provider:network_type',
'update_network:provider:physical_network',
'update_network:provider:segmentation_id']
self._test_set_rules_with_deprecated_policy(
{'extension:provider_network:set': 'rule:admin_only'},
dict((policy, 'rule:admin_only') for policy in
expected_policies))
def test_process_rules(self):
action = "create_" + FAKE_RESOURCE_NAME
# Construct RuleChecks for an action, attribute and subattribute