Merge "Add missing 'target_obj' when perform policy check"

This commit is contained in:
Zuul 2018-03-23 16:28:19 +00:00 committed by Gerrit Code Review
commit cc01f2e041
19 changed files with 127 additions and 60 deletions

View File

@ -78,11 +78,11 @@ class AdminController(wsgi.Controller):
explanation=_("Must specify a valid status"))
return update
def authorize(self, context, action_name):
def authorize(self, context, action_name, target_obj=None):
context.authorize(
'volume_extension:%(resource)s_admin_actions:%(action)s' %
{'resource': self.resource_name,
'action': action_name})
'action': action_name}, target_obj=target_obj)
def _remove_worker(self, context, id):
# Remove the cleanup worker from the DB when we change a resource
@ -107,7 +107,6 @@ class AdminController(wsgi.Controller):
'attached_mode')
context = req.environ['cinder.context']
self.authorize(context, 'reset_status')
update = self.validate_update(body['os-reset_status'])
msg = "Updating %(resource)s '%(id)s' with '%(update)r'"
LOG.debug(msg, {'resource': self.resource_name, 'id': id,
@ -132,9 +131,9 @@ class AdminController(wsgi.Controller):
def _force_delete(self, req, id, body):
"""Delete a resource, bypassing the check that it must be available."""
context = req.environ['cinder.context']
self.authorize(context, 'force_delete')
# Not found exception will be handled at the wsgi level
resource = self._get(context, id)
self.authorize(context, 'force_delete', target_obj=resource)
self._delete(context, resource, force=True)
@ -158,6 +157,10 @@ class VolumeAdminController(AdminController):
'none', 'starting',)
def _update(self, *args, **kwargs):
context = args[0]
volume_id = args[1]
volume = objects.Volume.get_by_id(context, volume_id)
self.authorize(context, 'reset_status', target_obj=volume)
db.volume_update(*args, **kwargs)
def _get(self, *args, **kwargs):
@ -204,9 +207,9 @@ class VolumeAdminController(AdminController):
def _force_detach(self, req, id, body):
"""Roll back a bad detach after the volume been disconnected."""
context = req.environ['cinder.context']
self.authorize(context, 'force_detach')
# Not found exception will be handled at the wsgi level
volume = self._get(context, id)
self.authorize(context, 'force_detach', target_obj=volume)
try:
connector = body['os-force_detach'].get('connector', None)
except AttributeError:
@ -242,9 +245,9 @@ class VolumeAdminController(AdminController):
def _migrate_volume(self, req, id, body):
"""Migrate a volume to the specified host."""
context = req.environ['cinder.context']
self.authorize(context, 'migrate_volume')
# Not found exception will be handled at the wsgi level
volume = self._get(context, id)
self.authorize(context, 'migrate_volume', target_obj=volume)
params = body['os-migrate_volume']
cluster_name, host = common.get_cluster_host(req, params,
@ -258,9 +261,9 @@ class VolumeAdminController(AdminController):
def _migrate_volume_completion(self, req, id, body):
"""Complete an in-progress migration."""
context = req.environ['cinder.context']
self.authorize(context, 'migrate_volume_completion')
# Not found exception will be handled at the wsgi level
volume = self._get(context, id)
self.authorize(context, 'migrate_volume_completion', target_obj=volume)
params = body['os-migrate_volume_completion']
try:
new_volume_id = params['new_volume']
@ -286,6 +289,7 @@ class SnapshotAdminController(AdminController):
snapshot_id = args[1]
fields = args[2]
snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
self.authorize(context, 'reset_status', target_obj=snapshot)
snapshot.update(fields)
snapshot.save()
@ -316,7 +320,6 @@ class BackupAdminController(AdminController):
def _reset_status(self, req, id, body):
"""Reset status on the resource."""
context = req.environ['cinder.context']
self.authorize(context, 'reset_status')
update = self.validate_update(body['os-reset_status'])
msg = "Updating %(resource)s '%(id)s' with '%(update)r'"
LOG.debug(msg, {'resource': self.resource_name, 'id': id,

View File

@ -40,8 +40,6 @@ class SnapshotActionsController(wsgi.Controller):
"""
context = req.environ['cinder.context']
context.authorize(policy.UPDATE_STATUS_POLICY)
LOG.debug("body: %s", body)
try:
status = body['os-update_snapshot_status']['status']
@ -59,6 +57,8 @@ class SnapshotActionsController(wsgi.Controller):
fields.SnapshotStatus.ERROR_DELETING]}
current_snapshot = objects.Snapshot.get_by_id(context, id)
context.authorize(policy.UPDATE_STATUS_POLICY,
target_obj=current_snapshot)
if current_snapshot.status not in status_map:
msg = _("Snapshot status %(cur)s not allowed for "

View File

@ -84,13 +84,13 @@ class SnapshotManageController(wsgi.Controller):
"""
context = req.environ['cinder.context']
context.authorize(policy.MANAGE_POLICY)
snapshot = body['snapshot']
# Check whether volume exists
volume_id = snapshot['volume_id']
# Not found exception will be handled at the wsgi level
volume = self.volume_api.get(context, volume_id)
context.authorize(policy.MANAGE_POLICY, target_obj=volume)
LOG.debug('Manage snapshot request body: %s', body)

View File

@ -46,12 +46,12 @@ class SnapshotUnmanageController(wsgi.Controller):
A Not Found error is returned if the specified snapshot does not exist.
"""
context = req.environ['cinder.context']
context.authorize(policy.UNMANAGE_POLICY)
LOG.info("Unmanage snapshot with id: %s", id)
try:
snapshot = self.volume_api.get_snapshot(context, id)
context.authorize(policy.UNMANAGE_POLICY, target_obj=snapshot)
self.volume_api.delete_snapshot(context, snapshot,
unmanage_only=True)
# Not found exception will be handled at the wsgi level

View File

@ -233,7 +233,7 @@ class VolumeActionsController(wsgi.Controller):
# Not found exception will be handled at the wsgi level
volume = self.volume_api.get(context, id)
context.authorize(policy.UPLOAD_IMAGE_POLICY)
context.authorize(policy.UPLOAD_IMAGE_POLICY, target_obj=volume)
# check for valid disk-format
disk_format = params.get("disk_format", "raw")
if not image_utils.validate_disk_format(disk_format):
@ -270,7 +270,8 @@ class VolumeActionsController(wsgi.Controller):
image_metadata['protected'] = params.get('protected', 'False')
if image_metadata['visibility'] == 'public':
context.authorize(policy.UPLOAD_PUBLIC_POLICY)
context.authorize(policy.UPLOAD_PUBLIC_POLICY,
target_obj=volume)
image_metadata['protected'] = (
utils.get_bool_param('protected', image_metadata))

View File

@ -18,6 +18,7 @@
from cinder.api import extensions
from cinder.api.openstack import wsgi
from cinder import db
from cinder import objects
from cinder.policies import volumes as policy
@ -27,7 +28,9 @@ class VolumeEncryptionMetadataController(wsgi.Controller):
def index(self, req, volume_id):
"""Returns the encryption metadata for a given volume."""
context = req.environ['cinder.context']
context.authorize(policy.ENCRYPTION_METADATA_POLICY)
volume = objects.Volume.get_by_id(context, volume_id)
context.authorize(policy.ENCRYPTION_METADATA_POLICY,
target_obj=volume)
return db.volume_encryption_metadata_get(context, volume_id)
def show(self, req, volume_id, id):

View File

@ -25,6 +25,7 @@ from cinder.api.schemas import volume_image_metadata
from cinder.api import validation
from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder.policies import volume_metadata as policy
from cinder import volume
@ -88,7 +89,9 @@ class VolumeImageMetadataController(wsgi.Controller):
@validation.schema(volume_image_metadata.set_image_metadata)
def create(self, req, id, body):
context = req.environ['cinder.context']
if context.authorize(policy.IMAGE_METADATA_POLICY):
volume = objects.Volume.get_by_id(context, id)
if context.authorize(policy.IMAGE_METADATA_POLICY,
target_obj=volume):
metadata = body['os-set_image_metadata']['metadata']
new_metadata = self._update_volume_image_metadata(context,
id,
@ -128,7 +131,8 @@ class VolumeImageMetadataController(wsgi.Controller):
def delete(self, req, id, body):
"""Deletes an existing image metadata."""
context = req.environ['cinder.context']
if context.authorize(policy.IMAGE_METADATA_POLICY):
volume = objects.Volume.get_by_id(context, id)
if context.authorize(policy.IMAGE_METADATA_POLICY, target_obj=volume):
key = body['os-unset_image_metadata']['key']
vol, metadata = self._get_image_metadata(context, id)

View File

@ -47,12 +47,12 @@ class VolumeUnmanageController(wsgi.Controller):
attached to an instance.
"""
context = req.environ['cinder.context']
context.authorize(policy.UNMANAGE_POLICY)
LOG.info("Unmanage volume with id: %s", id)
# Not found exception will be handled at the wsgi level
vol = self.volume_api.get(context, id)
context.authorize(policy.UNMANAGE_POLICY, target_obj=vol)
self.volume_api.delete(context, vol, unmanage_only=True)
return webob.Response(status_int=http_client.ACCEPTED)

View File

@ -57,7 +57,7 @@ class MessagesController(wsgi.Controller):
# Not found exception will be handled at the wsgi level
message = self.message_api.get(context, id)
context.authorize(policy.GET_POLICY)
context.authorize(policy.GET_POLICY, target_obj=message)
self._build_user_message(message)
return self._view_builder.detail(req, message)

View File

@ -66,11 +66,11 @@ class VolumeController(volumes_v2.VolumeController):
LOG.info("Delete volume with id: %(id)s %(params)s",
{'id': id, 'params': params}, context=context)
if force:
context.authorize(policy.FORCE_DELETE_POLICY)
volume = self.volume_api.get(context, id)
if force:
context.authorize(policy.FORCE_DELETE_POLICY, target_obj=volume)
self.volume_api.delete(context, volume,
cascade=cascade,
force=force)

View File

@ -36,6 +36,7 @@ from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder.objects import fields
from cinder.policies import backup_actions as backup_action_policy
from cinder.policies import backups as policy
import cinder.policy
from cinder import quota
@ -65,8 +66,9 @@ class API(base.Base):
super(API, self).__init__(db)
def get(self, context, backup_id):
context.authorize(policy.GET_POLICY)
return objects.Backup.get_by_id(context, backup_id)
backup = objects.Backup.get_by_id(context, backup_id)
context.authorize(policy.GET_POLICY, target_obj=backup)
return backup
def _check_support_to_force_delete(self, context, backup_host):
result = self.backup_rpcapi.check_support_to_force_delete(context,
@ -84,7 +86,7 @@ class API(base.Base):
:raises BackupDriverException:
:raises ServiceNotFound:
"""
context.authorize(policy.DELETE_POLICY)
context.authorize(policy.DELETE_POLICY, target_obj=backup)
if not force and backup.status not in [fields.BackupStatus.AVAILABLE,
fields.BackupStatus.ERROR]:
msg = _('Backup status must be available or error')
@ -198,8 +200,8 @@ class API(base.Base):
container, incremental=False, availability_zone=None,
force=False, snapshot_id=None, metadata=None):
"""Make the RPC call to create a volume backup."""
context.authorize(policy.CREATE_POLICY)
volume = self.volume_api.get(context, volume_id)
context.authorize(policy.CREATE_POLICY, target_obj=volume)
snapshot = None
if snapshot_id:
snapshot = self.volume_api.get_snapshot(context, snapshot_id)
@ -336,8 +338,8 @@ class API(base.Base):
def restore(self, context, backup_id, volume_id=None, name=None):
"""Make the RPC call to restore a volume backup."""
context.authorize(policy.RESTORE_POLICY)
backup = self.get(context, backup_id)
context.authorize(policy.RESTORE_POLICY, target_obj=backup)
if backup['status'] != fields.BackupStatus.AVAILABLE:
msg = _('Backup status must be available')
raise exception.InvalidBackup(reason=msg)
@ -421,6 +423,9 @@ class API(base.Base):
"""
# get backup info
backup = self.get(context, backup_id)
context.authorize(
backup_action_policy.BASE_POLICY_NAME % "reset_status",
target_obj=backup)
backup.host = self._get_available_backup_service_host(
backup.host, backup.availability_zone)
backup.save()
@ -439,8 +444,8 @@ class API(base.Base):
:returns: contains 'backup_url' and 'backup_service'
:raises InvalidBackup:
"""
context.authorize(policy.EXPORT_POLICY)
backup = self.get(context, backup_id)
context.authorize(policy.EXPORT_POLICY, target_obj=backup)
if backup['status'] != fields.BackupStatus.AVAILABLE:
msg = (_('Backup status must be available and not %s.') %
backup['status'])
@ -560,8 +565,8 @@ class API(base.Base):
return backup
def update(self, context, backup_id, fields):
context.authorize(policy.UPDATE_POLICY)
backup = self.get(context, backup_id)
context.authorize(policy.UPDATE_POLICY, target_obj=backup)
backup.update(fields)
backup.save()
return backup

View File

@ -877,7 +877,8 @@ class API(base.Base):
return group_snapshot
def delete_group_snapshot(self, context, group_snapshot, force=False):
context.authorize(gsnap_policy.DELETE_POLICY)
context.authorize(gsnap_policy.DELETE_POLICY,
target_obj=group_snapshot)
group_snapshot.assert_not_frozen()
values = {'status': 'deleting'}
expected = {'status': ('available', 'error')}
@ -904,15 +905,18 @@ class API(base.Base):
group_snapshot)
def update_group_snapshot(self, context, group_snapshot, fields):
context.authorize(gsnap_policy.UPDATE_POLICY)
context.authorize(gsnap_policy.UPDATE_POLICY,
target_obj=group_snapshot)
group_snapshot.update(fields)
group_snapshot.save()
def get_group_snapshot(self, context, group_snapshot_id):
context.authorize(gsnap_policy.GET_POLICY)
group_snapshots = objects.GroupSnapshot.get_by_id(context,
group_snapshot_id)
return group_snapshots
group_snapshot = objects.GroupSnapshot.get_by_id(context,
group_snapshot_id)
context.authorize(gsnap_policy.GET_POLICY,
target_obj=group_snapshot)
return group_snapshot
def get_all_group_snapshots(self, context, filters=None, marker=None,
limit=None, offset=None, sort_keys=None,
@ -936,7 +940,8 @@ class API(base.Base):
def reset_group_snapshot_status(self, context, gsnapshot, status):
"""Reset status of group snapshot"""
context.authorize(gsnap_action_policy.RESET_STATUS)
context.authorize(gsnap_action_policy.RESET_STATUS,
target_obj=gsnapshot)
field = {'updated_at': timeutils.utcnow(),
'status': status}
gsnapshot.update(field)

View File

@ -232,11 +232,12 @@ class AdminActionsTest(BaseAdminTest):
volume = db.volume_get(self.ctx, volume['id'])
self.assertEqual('error', volume['status'])
def test_reset_status_as_non_admin(self):
@mock.patch('cinder.objects.volume.Volume.get_by_id')
def test_reset_status_as_non_admin(self, fake_get):
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID)
volume = db.volume_create(self.ctx,
{'status': 'error', 'size': 1})
fake_get.return_value = volume
resp = self._issue_volume_reset(ctx,
volume,
{'status': 'error'})
@ -634,11 +635,13 @@ class AdminActionsTest(BaseAdminTest):
volume = self._migrate_volume_exec(self.ctx, volume, host,
expected_status)
def test_migrate_volume_as_non_admin(self):
@mock.patch("cinder.volume.api.API.get")
def test_migrate_volume_as_non_admin(self, fake_get):
expected_status = http_client.FORBIDDEN
host = 'test2'
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID)
volume = self._migrate_volume_prep()
fake_get.return_value = volume
self._migrate_volume_exec(ctx, volume, host, expected_status)
def test_migrate_volume_without_host_parameter(self):
@ -716,11 +719,13 @@ class AdminActionsTest(BaseAdminTest):
else:
self.assertNotIn('save_volume_id', resp_dict)
def test_migrate_volume_comp_as_non_admin(self):
@mock.patch("cinder.volume.api.API.get")
def test_migrate_volume_comp_as_non_admin(self, fake_get):
volume = db.volume_create(self.ctx, {'id': fake.VOLUME_ID})
new_volume = db.volume_create(self.ctx, {'id': fake.VOLUME2_ID})
expected_status = http_client.FORBIDDEN
expected_id = None
fake_get.return_value = volume
ctx = context.RequestContext(fake.USER_ID, fake.PROJECT_ID)
self._migrate_volume_comp_exec(ctx, volume, new_volume, False,
expected_status, expected_id)

View File

@ -105,8 +105,8 @@ class ExtendedSnapshotAttributesTest(test.TestCase):
self.assertSnapshotAttributes(self._get_snapshot(res.body),
project_id=fake.PROJECT_ID,
progress='0%')
calls = [mock.call(snap_policy.GET_POLICY), mock.call(
snap_policy.EXTEND_ATTRIBUTE, fatal=False)]
calls = [mock.call(snap_policy.GET_POLICY, target_obj=snapshot_obj),
mock.call(snap_policy.EXTEND_ATTRIBUTE, fatal=False)]
mock_authorize.assert_has_calls(calls)
@mock.patch('cinder.context.RequestContext.authorize')

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
import uuid
from oslo_policy import policy as oslo_policy
@ -201,7 +202,8 @@ class VolumeImageMetadataTest(test.TestCase):
self.assertEqual({'key1': 'value1', 'key2': 'value2'},
self._get_image_metadata_list(res.body)[0])
def test_create_image_metadata(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_create_image_metadata(self, fake_get):
self.mock_object(volume.api.API, 'get_volume_image_metadata',
return_empty_image_metadata)
self.mock_object(db, 'volume_metadata_update',
@ -213,6 +215,7 @@ class VolumeImageMetadataTest(test.TestCase):
req.method = "POST"
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
fake_get.return_value = {}
res = req.get_response(fakes.wsgi_app(
fake_auth_context=self.user_ctxt))
@ -220,12 +223,14 @@ class VolumeImageMetadataTest(test.TestCase):
self.assertEqual(fake_image_metadata,
jsonutils.loads(res.body)["metadata"])
def test_create_image_metadata_policy_not_authorized(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_create_image_metadata_policy_not_authorized(self, fake_get):
rules = {
metadata_policy.IMAGE_METADATA_POLICY: base_policy.RULE_ADMIN_API
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
self.addCleanup(policy.reset)
fake_get.return_value = {}
req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
fake.PROJECT_ID, fake.VOLUME_ID), use_admin_context=False)
@ -241,13 +246,15 @@ class VolumeImageMetadataTest(test.TestCase):
self.controller.create, req, fake.VOLUME_ID,
body=body)
def test_create_with_keys_case_insensitive(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_create_with_keys_case_insensitive(self, fake_get):
# If the keys in uppercase_and_lowercase, should return the one
# which server added
self.mock_object(volume.api.API, 'get_volume_image_metadata',
return_empty_image_metadata)
self.mock_object(db, 'volume_metadata_update',
fake_create_volume_metadata)
fake_get.return_value = {}
body = {
"os-set_image_metadata": {
@ -272,11 +279,13 @@ class VolumeImageMetadataTest(test.TestCase):
self.assertEqual(fake_image_metadata,
jsonutils.loads(res.body)["metadata"])
def test_create_empty_body(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_create_empty_body(self, fake_get):
req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
fake.PROJECT_ID, fake.VOLUME_ID))
req.method = 'POST'
req.headers["content-type"] = "application/json"
fake_get.return_value = {}
self.assertRaises(exception.ValidationError,
self.controller.create, req, fake.VOLUME_ID,
@ -297,7 +306,8 @@ class VolumeImageMetadataTest(test.TestCase):
self.controller.create, req, fake.VOLUME_ID,
body=body)
def test_invalid_metadata_items_on_create(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_invalid_metadata_items_on_create(self, fake_get):
self.mock_object(db, 'volume_metadata_update',
fake_create_volume_metadata)
req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
@ -308,6 +318,7 @@ class VolumeImageMetadataTest(test.TestCase):
data = {"os-set_image_metadata": {
"metadata": {"a" * 260: "value1"}}
}
fake_get.return_value = {}
# Test for long key
req.body = jsonutils.dump_as_bytes(data)
@ -333,7 +344,8 @@ class VolumeImageMetadataTest(test.TestCase):
self.controller.create, req, fake.VOLUME_ID,
body=data)
def test_delete(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_delete(self, fake_get):
self.mock_object(db, 'volume_metadata_delete',
volume_metadata_delete)
@ -345,17 +357,20 @@ class VolumeImageMetadataTest(test.TestCase):
req.method = 'POST'
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
fake_get.return_value = {}
res = req.get_response(fakes.wsgi_app(
fake_auth_context=self.user_ctxt))
self.assertEqual(http_client.OK, res.status_int)
def test_delete_image_metadata_policy_not_authorized(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_delete_image_metadata_policy_not_authorized(self, fake_get):
rules = {
metadata_policy.IMAGE_METADATA_POLICY: base_policy.RULE_ADMIN_API
}
policy.set_rules(oslo_policy.Rules.from_dict(rules))
self.addCleanup(policy.reset)
fake_get.return_value = {}
req = fakes.HTTPRequest.blank('/v2/%s/volumes/%s/action' % (
fake.PROJECT_ID, fake.VOLUME_ID), use_admin_context=False)
@ -371,7 +386,8 @@ class VolumeImageMetadataTest(test.TestCase):
self.controller.delete, req, fake.VOLUME_ID,
body=None)
def test_delete_meta_not_found(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_delete_meta_not_found(self, fake_get):
data = {"os-unset_image_metadata": {
"key": "invalid_id"}
}
@ -380,12 +396,14 @@ class VolumeImageMetadataTest(test.TestCase):
req.method = 'POST'
req.body = jsonutils.dump_as_bytes(data)
req.headers["content-type"] = "application/json"
fake_get.return_value = {}
self.assertRaises(exception.GlanceMetadataNotFound,
self.controller.delete, req, fake.VOLUME_ID,
body=data)
def test_delete_nonexistent_volume(self):
@mock.patch('cinder.objects.Volume.get_by_id')
def test_delete_nonexistent_volume(self, fake_get):
self.mock_object(db, 'volume_metadata_delete',
return_volume_nonexistent)
@ -397,6 +415,7 @@ class VolumeImageMetadataTest(test.TestCase):
req.method = 'POST'
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
fake_get.return_value = {}
self.assertRaises(exception.GlanceMetadataNotFound,
self.controller.delete, req, fake.VOLUME_ID,

View File

@ -25,6 +25,7 @@ from cinder import exception
import cinder.group
from cinder import objects
from cinder.objects import fields
from cinder.policies import group_snapshots as g_snap_policies
from cinder import quota
from cinder import test
from cinder.tests.unit import fake_constants as fake
@ -297,12 +298,16 @@ class GroupAPITestCase(test.TestCase):
remove_volumes=vol2.id)
@mock.patch('cinder.objects.GroupSnapshot.get_by_id')
def test_get_group_snapshot(self, mock_group_snap):
@mock.patch('cinder.context.RequestContext.authorize')
def test_get_group_snapshot(self, mock_authorize, mock_group_snap):
fake_group_snap = 'fake_group_snap'
mock_group_snap.return_value = fake_group_snap
grp_snap = self.group_api.get_group_snapshot(
self.ctxt, fake.GROUP_SNAPSHOT_ID)
self.assertEqual(fake_group_snap, grp_snap)
mock_authorize.assert_called_once_with(
g_snap_policies.GET_POLICY,
target_obj=fake_group_snap)
@ddt.data(True, False)
@mock.patch('cinder.objects.GroupSnapshotList.get_all')

View File

@ -17,6 +17,8 @@ from oslo_config import cfg
from cinder import context
from cinder import exception
from cinder import objects
from cinder.policies import volume_actions as vol_action_policies
from cinder.policies import volumes as volume_policies
from cinder import quota
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import volume as base
@ -63,8 +65,9 @@ class VolumeRetypeTestCase(base.BaseVolumeTestCase):
else:
return self.default_vol_type
@mock.patch('cinder.context.RequestContext.authorize')
@mock.patch.object(volume_types, 'get_by_name_or_id')
def test_retype_multiattach(self, _mock_get_types):
def test_retype_multiattach(self, _mock_get_types, mock_authorize):
"""Verify multiattach retype restrictions."""
_mock_get_types.side_effect = self.fake_get_vtype
@ -106,3 +109,15 @@ class VolumeRetypeTestCase(base.BaseVolumeTestCase):
self.context,
vol,
'multiattach-type')
mock_authorize.assert_has_calls(
[mock.call(volume_policies.CREATE_POLICY),
mock.call(volume_policies.CREATE_POLICY),
mock.call(vol_action_policies.RETYPE_POLICY, target_obj=mock.ANY),
mock.call(volume_policies.MULTIATTACH_POLICY,
target_obj=mock.ANY),
mock.call(volume_policies.CREATE_POLICY),
mock.call(volume_policies.MULTIATTACH_POLICY),
mock.call(volume_policies.CREATE_POLICY),
mock.call(vol_action_policies.RETYPE_POLICY, target_obj=mock.ANY),
mock.call(vol_action_policies.RETYPE_POLICY, target_obj=mock.ANY),
])

View File

@ -66,10 +66,10 @@ class API(base.Base):
def delete(self, context, transfer_id):
"""Make the RPC call to delete a volume transfer."""
context.authorize(policy.DELETE_POLICY)
transfer = self.db.transfer_get(context, transfer_id)
volume_ref = self.db.volume_get(context, transfer.volume_id)
context.authorize(policy.DELETE_POLICY, target_obj=volume_ref)
volume_utils.notify_about_volume_usage(context, volume_ref,
"transfer.delete.start")
if volume_ref['status'] != 'awaiting-transfer':
@ -115,9 +115,9 @@ class API(base.Base):
def create(self, context, volume_id, display_name):
"""Creates an entry in the transfers table."""
context.authorize(policy.CREATE_POLICY)
LOG.info("Generating transfer record for volume %s", volume_id)
volume_ref = self.db.volume_get(context, volume_id)
context.authorize(policy.CREATE_POLICY, target_obj=volume_ref)
if volume_ref['status'] != "available":
raise exception.InvalidVolume(reason=_("status must be available"))
if volume_ref['encryption_key_id'] is not None:

View File

@ -645,8 +645,8 @@ class API(base.Base):
return volumes
def get_snapshot(self, context, snapshot_id):
context.authorize(snapshot_policy.GET_POLICY)
snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
context.authorize(snapshot_policy.GET_POLICY, target_obj=snapshot)
# FIXME(jdg): The objects don't have the db name entries
# so build the resource tag manually for now.
@ -656,8 +656,8 @@ class API(base.Base):
return snapshot
def get_volume(self, context, volume_id):
context.authorize(vol_policy.GET_POLICY)
volume = objects.Volume.get_by_id(context, volume_id)
context.authorize(vol_policy.GET_POLICY, target_obj=volume)
LOG.info("Volume retrieved successfully.", resource=volume)
return volume
@ -863,7 +863,7 @@ class API(base.Base):
cgsnapshot_id,
commit_quota=True,
group_snapshot_id=None):
context.authorize(snapshot_policy.CREATE_POLICY)
context.authorize(snapshot_policy.CREATE_POLICY, target_obj=volume)
utils.check_metadata_properties(metadata)
if not volume.host:
@ -1656,7 +1656,8 @@ class API(base.Base):
# If they are retyping to a multiattach capable, make sure they
# are allowed to do so.
if tgt_is_multiattach:
context.authorize(vol_policy.MULTIATTACH_POLICY)
context.authorize(vol_policy.MULTIATTACH_POLICY,
target_obj=volume)
# We're checking here in so that we can report any quota issues as
# early as possible, but won't commit until we change the type. We
@ -2070,7 +2071,8 @@ class API(base.Base):
vref.status == 'in-use' and
vref.bootable):
ctxt.authorize(
attachment_policy.MULTIATTACH_BOOTABLE_VOLUME_POLICY)
attachment_policy.MULTIATTACH_BOOTABLE_VOLUME_POLICY,
target_obj=vref)
# FIXME(JDG): We want to be able to do things here like reserve a
# volume for Nova to do BFV WHILE the volume may be in the process of