Add RESTful endpoints for Story permissions

Having to update the whole Story in order to add/remove a single
permission entry is quite limiting. This commit addresses this
flaw by adding subcontrollers to retrieve and modify the lists of
Users and Teams who have access to a Story, without having to
modify the whole Story.

This allows much simpler use for clients in situations where
only the ACL needs modification.

Change-Id: I2ecee2c38456c5a23ae1dc7bdecb94efb2daac04
This commit is contained in:
Adam Coldrick 2019-03-08 15:07:07 +00:00 committed by Adam Coldrick
parent b7e1b2e2ae
commit 1e0338c5dc
3 changed files with 257 additions and 3 deletions

View File

@ -1,5 +1,5 @@
# Copyright (c) 2013 Mirantis Inc.
# Copyright (c) 2016 Codethink Ltd.
# Copyright (c) 2016, 2019 Codethink Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -37,8 +37,10 @@ from storyboard.api.v1 import validations
from storyboard.api.v1 import wmodels
from storyboard.common import decorators
from storyboard.common import exception as exc
from storyboard.db.api import base as api_base
from storyboard.db.api import stories as stories_api
from storyboard.db.api import subscriptions as subscription_api
from storyboard.db.api import teams as teams_api
from storyboard.db.api import timeline_events as events_api
from storyboard.db.api import users as users_api
@ -60,6 +62,133 @@ def create_story_wmodel(story):
return story_model
class UsersSubcontroller(rest.RestController):
"""Manage Users who can access the Story."""
@decorators.db_exceptions
@secure(checks.guest)
@wsme_pecan.wsexpose([wmodels.User], int)
def get(self, story_id):
"""Get users with access to a story.
Example::
curl https://my.example.org/api/v1/stories/1/users
:param story_id: ID of the story to get users for.
"""
story = stories_api.story_get_simple(
story_id, current_user=request.current_user_id)
if not story:
raise exc.NotFound(_("Story %s not found") % story_id)
if not story.permissions:
return []
permission = story.permissions[0]
users = [api_base._filter_non_public_fields(user, user._public_fields)
for user in permission.users]
return [wmodels.User.from_db_model(user) for user in users]
@decorators.db_exceptions
@secure(checks.authenticated)
@wsme_pecan.wsexpose(wmodels.User, int, int)
def put(self, story_id, user_id):
"""Add a user to a story.
Example::
TODO
:param story_id: ID of the story to add a user to.
:param user_id: ID of the user.
"""
stories_api.add_user(story_id, user_id, request.current_user_id)
user = users_api.user_get(user_id)
user = api_base._filter_non_public_fields(user, user._public_fields)
return wmodels.User.from_db_model(user)
@decorators.db_exceptions
@secure(checks.authenticated)
@wsme_pecan.wsexpose(None, int, int, status_code=204)
def delete(self, story_id, user_id):
"""Delete a user from a team.
Example::
TODO
:param team_id: An ID of the team.
:param user_id: An ID of the user.
"""
stories_api.delete_user(story_id, user_id, request.current_user_id)
class TeamsSubcontroller(rest.RestController):
"""Manage Teams who can access the story."""
@decorators.db_exceptions
@secure(checks.guest)
@wsme_pecan.wsexpose([wmodels.Team], int)
def get(self, story_id):
"""Get users inside a team.
Example::
curl https://my.example.org/api/v1/teams/1/users
:param team_id: An ID of the team.
"""
story = stories_api.story_get_simple(
story_id, current_user=request.current_user_id)
if not story:
raise exc.NotFound(_("Story %s not found") % story_id)
if not story.permissions:
return []
permission = story.permissions[0]
return [wmodels.Team.from_db_model(team) for team in permission.teams]
@decorators.db_exceptions
@secure(checks.authenticated)
@wsme_pecan.wsexpose(wmodels.Team, int, int)
def put(self, story_id, team_id):
"""Add a team to a story.
Example::
TODO
:param story_id: ID of the story to add a team to.
:param team_id: ID of the team.
"""
stories_api.add_team(story_id, team_id, request.current_user_id)
team = teams_api.team_get(team_id)
return wmodels.Team.from_db_model(team)
@decorators.db_exceptions
@secure(checks.authenticated)
@wsme_pecan.wsexpose(None, int, int, status_code=204)
def delete(self, story_id, team_id):
"""Delete a team from a story.
Example::
TODO
:param story_id: ID of the story to remove a team from.
:param team_id: ID of the team.
"""
stories_api.delete_team(story_id, team_id, request.current_user_id)
class StoriesController(rest.RestController):
"""Manages operations on stories."""
@ -364,6 +493,8 @@ class StoriesController(rest.RestController):
events = NestedTimeLineEventsController()
tasks = TasksNestedController()
tags = TagsController()
teams = TeamsSubcontroller()
users = UsersSubcontroller()
@decorators.db_exceptions
@secure(checks.guest)

View File

@ -17,6 +17,7 @@ import datetime
import pytz
from sqlalchemy.orm import subqueryload
from wsme.exc import ClientSideError
from storyboard._i18n import _
from storyboard.common import exception as exc
@ -454,3 +455,125 @@ def update_permission(story, users, teams, session=None):
return api_base.entity_update(models.Permission,
permission.id,
permission_dict)
def add_user(story_id, user_id, current_user=None):
session = api_base.get_session()
with session.begin(subtransactions=True):
story = story_get_simple(
story_id, session=session, current_user=current_user)
if not story:
raise exc.NotFound(_("Story %s not found") % story_id)
user = users_api.user_get(user_id, session=session)
if not user:
raise exc.NotFound(_("User %s not found") % user_id)
if not story.permissions:
create_permission(story, [user], [], session)
return
permission = story.permissions[0]
if user_id in [u.id for u in permission.users]:
raise ClientSideError(_("The User %{user_id}d is already in the "
"permission list for Story "
"%{story_id}d") %
{"user_id": user_id, "story_id": story_id})
permission.users.append(user)
session.add(permission)
return story
def delete_user(story_id, user_id, current_user=None):
session = api_base.get_session()
with session.begin(subtransactions=True):
story = story_get_simple(
story_id, session=session, current_user=current_user)
if not story:
raise exc.NotFound(_("Story %s not found") % story_id)
user = users_api.user_get(user_id, session=session)
if not user:
raise exc.NotFound(_("User %s not found") % user_id)
if not story.permissions:
raise ClientSideError(_("The User %{user_id}d isn't in the "
"permission list for Story "
"%{story_id}d") %
{"user_id": user_id, "story_id": story_id})
permission = story.permissions[0]
if user_id not in [u.id for u in permission.users]:
raise ClientSideError(_("The User %{user_id}d isn't in the "
"permission list for Story "
"%{story_id}d") %
{"user_id": user_id, "story_id": story_id})
entry = [u for u in permission.users if u.id == user_id][0]
permission.users.remove(entry)
session.add(permission)
return story
def add_team(story_id, team_id, current_user=None):
session = api_base.get_session()
with session.begin(subtransactions=True):
story = story_get_simple(
story_id, session=session, current_user=current_user)
if not story:
raise exc.NotFound(_("Story %s not found") % story_id)
team = teams_api.team_get(team_id, session=session)
if not team:
raise exc.NotFound(_("Team %s not found") % team_id)
if not story.permissions:
create_permission(story, [], [team], session)
return
permission = story.permissions[0]
if team_id in [t.id for t in permission.teams]:
raise ClientSideError(_("The Team %{team_id}d is already in the "
"permission list for Story "
"%{story_id}d") %
{"team_id": team_id, "story_id": story_id})
permission.teams.append(team)
session.add(permission)
return story
def delete_team(story_id, team_id, current_user=None):
session = api_base.get_session()
with session.begin(subtransactions=True):
story = story_get_simple(
story_id, session=session, current_user=current_user)
if not story:
raise exc.NotFound(_("Story %s not found") % story_id)
team = teams_api.team_get(team_id, session=session)
if not team:
raise exc.NotFound(_("User %s not found") % team_id)
if not story.permissions:
raise ClientSideError(_("The Team %{team_id}d isn't in the "
"permission list for Story "
"%{story_id}d") %
{"team_id": team_id, "story_id": story_id})
permission = story.permissions[0]
if team_id not in [t.id for t in permission.teams]:
raise ClientSideError(_("The Team %{team_id}d isn't in the "
"permission list for Story "
"%{story_id}d") %
{"team_id": team_id, "story_id": story_id})
entry = [t for t in permission.teams if t.id == team_id][0]
permission.teams.remove(entry)
session.add(permission)
return story

View File

@ -34,8 +34,8 @@ def _entity_get(id, session=None):
return query.first()
def team_get(team_id):
return _entity_get(team_id)
def team_get(team_id, session=None):
return _entity_get(team_id, session=session)
def _team_build_query(project_id=None, **kwargs):