policy checks are now being done in the api layer

API layer talks to object layer to get DB records, it then
uses that DB record and context object to perform policy checks
to determine if the necessary API request has required
authorization to access the REST endpoint/resource.

Modified tests to use new methods in API which are doing
policy checks.

Closes-Bug: 1517177
Depends-On: I385c161bc10d6a22c6b46fa19dc7c195ff222f8b

Change-Id: I767f59061cc9aa1df20bde0b1fe33d069e01d751
This commit is contained in:
Saurabh Surana 2015-12-08 14:46:23 -08:00 committed by dagnello
parent 32a62eb42e
commit 880a103778
3 changed files with 43 additions and 50 deletions

View File

@ -34,6 +34,7 @@ from cue.api.controllers import base
from cue.common import exception
from cue.common.i18n import _ # noqa
from cue.common.i18n import _LI # noqa
from cue.common import policy
from cue.common import validate_auth_token as auth_validate
from cue import objects
from cue.taskflow import client as task_flow_client
@ -127,6 +128,9 @@ def get_complete_cluster(context, cluster_id):
cluster_obj = objects.Cluster.get_cluster_by_id(context, cluster_id)
target = {'tenant_id': cluster_obj.project_id}
policy.check("cluster:get", context, target)
cluster_as_dict = cluster_obj.as_dict()
# convert 'network_id' to list for ClusterDetails compatibility
@ -152,37 +156,10 @@ def get_complete_cluster(context, cluster_id):
return cluster
class ClusterController(rest.RestController):
"""Manages operations on specific Cluster of nodes."""
@wsme_pecan.wsexpose(Cluster, wtypes.text, status_code=200)
def get_one(self, cluster_id):
"""Return this cluster."""
# validate cluster_id is of type Uuid
try:
wtypes.UuidType().validate(cluster_id)
except ValueError:
raise exception.Invalid(_("Invalid cluster ID format provided"))
context = pecan.request.context
cluster = get_complete_cluster(context, cluster_id)
cluster.unset_empty_fields()
return cluster
@wsme_pecan.wsexpose(None, wtypes.text, status_code=202)
def delete(self, cluster_id):
"""Delete this Cluster."""
# validate cluster_id is of type Uuid
try:
wtypes.UuidType().validate(cluster_id)
except ValueError:
raise exception.Invalid(_("Invalid cluster ID format provided"))
context = pecan.request.context
def delete_complete_cluster(context, cluster_id):
cluster_obj = objects.Cluster.get_cluster_by_id(context, cluster_id)
target = {'tenant_id': cluster_obj.project_id}
policy.check("cluster:delete", context, target)
# update cluster to deleting
objects.Cluster.update_cluster_deleting(context, cluster_id)
@ -217,6 +194,38 @@ class ClusterController(rest.RestController):
'%(job_id)s') % ({"cluster_id": cluster_id,
"job_id": job_uuid}))
class ClusterController(rest.RestController):
"""Manages operations on specific Cluster of nodes."""
@wsme_pecan.wsexpose(Cluster, wtypes.text, status_code=200)
def get_one(self, cluster_id):
"""Return this cluster."""
# validate cluster_id is of type Uuid
try:
wtypes.UuidType().validate(cluster_id)
except ValueError:
raise exception.Invalid(_("Invalid cluster ID format provided"))
context = pecan.request.context
cluster = get_complete_cluster(context, cluster_id)
cluster.unset_empty_fields()
return cluster
@wsme_pecan.wsexpose(None, wtypes.text, status_code=202)
def delete(self, cluster_id):
"""Delete this Cluster."""
# validate cluster_id is of type Uuid
try:
wtypes.UuidType().validate(cluster_id)
except ValueError:
raise exception.Invalid(_("Invalid cluster ID format provided"))
context = pecan.request.context
delete_complete_cluster(context, cluster_id)
@wsme_pecan.wsexpose([Cluster], status_code=200)
def get_all(self):
"""Return list of Clusters."""

View File

@ -16,7 +16,6 @@
# Copyright [2014] Hewlett-Packard Development Company, L.P.
# limitations under the License.
from cue.common import policy
from cue.db import api as db_api
from cue.objects import base
from cue.objects import utils as obj_utils
@ -58,9 +57,6 @@ class Cluster(base.CueObject):
self['project_id'] = context.project_id
cluster_changes = self.obj_get_changes()
target = {'tenant_id': self['project_id']}
policy.check('clusters:create', context, target)
db_cluster = self.dbapi.create_cluster(context, cluster_changes)
self._from_db_object(self, db_cluster)
@ -84,9 +80,6 @@ class Cluster(base.CueObject):
:returns: a list of :class:'Cluster' object.
"""
target = {'tenant_id': context.tenant}
policy.check('clusters:get', context, target)
db_clusters = cls.dbapi.get_clusters(context)
return [Cluster._from_db_object(Cluster(), obj) for obj in db_clusters]
@ -100,13 +93,7 @@ class Cluster(base.CueObject):
"""
db_cluster = cls.dbapi.get_cluster_by_id(context, cluster_id)
target = {'tenant_id': db_cluster.project_id}
policy.check("cluster:get", context, target)
cluster = Cluster._from_db_object(Cluster(), db_cluster)
return cluster
return Cluster._from_db_object(Cluster(), db_cluster)
@classmethod
def update_cluster_deleting(cls, context, cluster_id):
@ -116,7 +103,4 @@ class Cluster(base.CueObject):
:param cluster_id: UUID of a cluster
"""
db_cluster = cls.dbapi.get_cluster_by_id(context, cluster_id)
target = {'tenant_id': db_cluster.project_id}
policy.check("cluster:delete", context, target)
cls.dbapi.update_cluster_deleting(context, cluster_id)

View File

@ -216,7 +216,7 @@ class ClusterObjectsTests(base.FunctionalTestCase):
objects.Cluster.dbapi, 'get_cluster_by_id',
return_value=api_cluster):
with testtools.ExpectedException(exception.NotAuthorized):
objects.Cluster.get_cluster_by_id(tenant_b, api_cluster.id)
cluster.get_complete_cluster(tenant_b, api_cluster.id)
def test_update_cluster_deleting_forbidden(self):
"""Tests delete from Cluster objects API with invalid tenant."""
@ -227,5 +227,5 @@ class ClusterObjectsTests(base.FunctionalTestCase):
objects.Cluster.dbapi, 'get_cluster_by_id',
return_value=api_cluster):
with testtools.ExpectedException(exception.NotAuthorized):
objects.Cluster.update_cluster_deleting(
cluster.delete_complete_cluster(
tenant_b, api_cluster.id)