Merge "Refactor tag support"
This commit is contained in:
commit
05eceb38c3
|
@ -15,7 +15,7 @@ from openstack import resource
|
|||
from openstack import utils
|
||||
|
||||
|
||||
class Server(resource.Resource, metadata.MetadataMixin):
|
||||
class Server(resource.Resource, metadata.MetadataMixin, resource.TagMixin):
|
||||
resource_key = 'server'
|
||||
resources_key = 'servers'
|
||||
base_path = '/servers'
|
||||
|
@ -33,14 +33,13 @@ class Server(resource.Resource, metadata.MetadataMixin):
|
|||
"sort_key", "sort_dir",
|
||||
"reservation_id", "tags",
|
||||
"project_id",
|
||||
tags_any="tags-any",
|
||||
not_tags="not-tags",
|
||||
not_tags_any="not-tags-any",
|
||||
is_deleted="deleted",
|
||||
ipv4_address="ip",
|
||||
ipv6_address="ip6",
|
||||
changes_since="changes-since",
|
||||
all_projects="all_tenants")
|
||||
all_projects="all_tenants",
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
)
|
||||
|
||||
#: A list of dictionaries holding links relevant to this server.
|
||||
links = resource.Body('links')
|
||||
|
|
|
@ -14,7 +14,7 @@ from openstack import resource
|
|||
from openstack import utils
|
||||
|
||||
|
||||
class Project(resource.Resource):
|
||||
class Project(resource.Resource, resource.TagMixin):
|
||||
resource_key = 'project'
|
||||
resources_key = 'projects'
|
||||
base_path = '/projects'
|
||||
|
@ -33,6 +33,7 @@ class Project(resource.Resource):
|
|||
'name',
|
||||
'parent_id',
|
||||
is_enabled='enabled',
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
)
|
||||
|
||||
# Properties
|
||||
|
|
|
@ -20,7 +20,7 @@ from openstack import utils
|
|||
_logger = _log.setup_logging('openstack')
|
||||
|
||||
|
||||
class Image(resource.Resource):
|
||||
class Image(resource.Resource, resource.TagMixin):
|
||||
resources_key = 'images'
|
||||
base_path = '/images'
|
||||
|
||||
|
@ -232,16 +232,6 @@ class Image(resource.Resource):
|
|||
"""
|
||||
self._action(session, "reactivate")
|
||||
|
||||
def add_tag(self, session, tag):
|
||||
"""Add a tag to an image"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'tags', tag)
|
||||
session.put(url,)
|
||||
|
||||
def remove_tag(self, session, tag):
|
||||
"""Remove a tag from an image"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'tags', tag)
|
||||
session.delete(url,)
|
||||
|
||||
def upload(self, session):
|
||||
"""Upload data into an existing image"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'file')
|
||||
|
|
|
@ -10,11 +10,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from openstack.network.v2 import tag
|
||||
from openstack import resource
|
||||
|
||||
|
||||
class FloatingIP(resource.Resource, tag.TagMixin):
|
||||
class FloatingIP(resource.Resource, resource.TagMixin):
|
||||
name_attribute = "floating_ip_address"
|
||||
resource_name = "floating ip"
|
||||
resource_key = 'floatingip'
|
||||
|
@ -33,7 +32,7 @@ class FloatingIP(resource.Resource, tag.TagMixin):
|
|||
'floating_ip_address', 'floating_network_id',
|
||||
'port_id', 'router_id', 'status', 'subnet_id',
|
||||
project_id='tenant_id',
|
||||
**tag.TagMixin._tag_query_parameters)
|
||||
**resource.TagMixin._tag_query_parameters)
|
||||
|
||||
# Properties
|
||||
#: Timestamp at which the floating IP was created.
|
||||
|
|
|
@ -10,11 +10,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from openstack.network.v2 import tag
|
||||
from openstack import resource
|
||||
|
||||
|
||||
class Network(resource.Resource, tag.TagMixin):
|
||||
class Network(resource.Resource, resource.TagMixin):
|
||||
resource_key = 'network'
|
||||
resources_key = 'networks'
|
||||
base_path = '/networks'
|
||||
|
@ -39,7 +38,7 @@ class Network(resource.Resource, tag.TagMixin):
|
|||
provider_network_type='provider:network_type',
|
||||
provider_physical_network='provider:physical_network',
|
||||
provider_segmentation_id='provider:segmentation_id',
|
||||
**tag.TagMixin._tag_query_parameters
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
)
|
||||
|
||||
# Properties
|
||||
|
|
|
@ -10,11 +10,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from openstack.network.v2 import tag
|
||||
from openstack import resource
|
||||
|
||||
|
||||
class Port(resource.Resource, tag.TagMixin):
|
||||
class Port(resource.Resource, resource.TagMixin):
|
||||
resource_key = 'port'
|
||||
resources_key = 'ports'
|
||||
base_path = '/ports'
|
||||
|
@ -35,7 +34,7 @@ class Port(resource.Resource, tag.TagMixin):
|
|||
is_admin_state_up='admin_state_up',
|
||||
is_port_security_enabled='port_security_enabled',
|
||||
project_id='tenant_id',
|
||||
**tag.TagMixin._tag_query_parameters
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
)
|
||||
|
||||
# Properties
|
||||
|
|
|
@ -10,12 +10,11 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from openstack.network.v2 import tag
|
||||
from openstack import resource
|
||||
from openstack import utils
|
||||
|
||||
|
||||
class QoSPolicy(resource.Resource, tag.TagMixin):
|
||||
class QoSPolicy(resource.Resource, resource.TagMixin):
|
||||
resource_key = 'policy'
|
||||
resources_key = 'policies'
|
||||
base_path = '/qos/policies'
|
||||
|
@ -31,7 +30,7 @@ class QoSPolicy(resource.Resource, tag.TagMixin):
|
|||
'name', 'description', 'is_default',
|
||||
project_id='tenant_id',
|
||||
is_shared='shared',
|
||||
**tag.TagMixin._tag_query_parameters
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
)
|
||||
|
||||
# Properties
|
||||
|
|
|
@ -10,12 +10,11 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from openstack.network.v2 import tag
|
||||
from openstack import resource
|
||||
from openstack import utils
|
||||
|
||||
|
||||
class Router(resource.Resource, tag.TagMixin):
|
||||
class Router(resource.Resource, resource.TagMixin):
|
||||
resource_key = 'router'
|
||||
resources_key = 'routers'
|
||||
base_path = '/routers'
|
||||
|
@ -34,7 +33,7 @@ class Router(resource.Resource, tag.TagMixin):
|
|||
is_distributed='distributed',
|
||||
is_ha='ha',
|
||||
project_id='tenant_id',
|
||||
**tag.TagMixin._tag_query_parameters
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
)
|
||||
|
||||
# Properties
|
||||
|
|
|
@ -10,11 +10,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from openstack.network.v2 import tag
|
||||
from openstack import resource
|
||||
|
||||
|
||||
class SecurityGroup(resource.Resource, tag.TagMixin):
|
||||
class SecurityGroup(resource.Resource, resource.TagMixin):
|
||||
resource_key = 'security_group'
|
||||
resources_key = 'security_groups'
|
||||
base_path = '/security-groups'
|
||||
|
@ -29,7 +28,7 @@ class SecurityGroup(resource.Resource, tag.TagMixin):
|
|||
_query_mapping = resource.QueryParameters(
|
||||
'description', 'name',
|
||||
project_id='tenant_id',
|
||||
**tag.TagMixin._tag_query_parameters
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
)
|
||||
|
||||
# Properties
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
from openstack import resource
|
||||
|
||||
|
||||
class SecurityGroupRule(resource.Resource):
|
||||
class SecurityGroupRule(resource.Resource, resource.TagMixin):
|
||||
resource_key = 'security_group_rule'
|
||||
resources_key = 'security_group_rules'
|
||||
base_path = '/security-group-rules'
|
||||
|
@ -30,6 +30,7 @@ class SecurityGroupRule(resource.Resource):
|
|||
'remote_group_id', 'security_group_id',
|
||||
ether_type='ethertype',
|
||||
project_id='tenant_id',
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
|
||||
)
|
||||
|
||||
|
|
|
@ -10,11 +10,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from openstack.network.v2 import tag
|
||||
from openstack import resource
|
||||
|
||||
|
||||
class Subnet(resource.Resource, tag.TagMixin):
|
||||
class Subnet(resource.Resource, resource.TagMixin):
|
||||
resource_key = 'subnet'
|
||||
resources_key = 'subnets'
|
||||
base_path = '/subnets'
|
||||
|
@ -35,7 +34,7 @@ class Subnet(resource.Resource, tag.TagMixin):
|
|||
project_id='tenant_id',
|
||||
subnet_pool_id='subnetpool_id',
|
||||
use_default_subnet_pool='use_default_subnetpool',
|
||||
**tag.TagMixin._tag_query_parameters
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
)
|
||||
|
||||
# Properties
|
||||
|
|
|
@ -10,11 +10,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from openstack.network.v2 import tag
|
||||
from openstack import resource
|
||||
|
||||
|
||||
class SubnetPool(resource.Resource, tag.TagMixin):
|
||||
class SubnetPool(resource.Resource, resource.TagMixin):
|
||||
resource_key = 'subnetpool'
|
||||
resources_key = 'subnetpools'
|
||||
base_path = '/subnetpools'
|
||||
|
@ -31,7 +30,7 @@ class SubnetPool(resource.Resource, tag.TagMixin):
|
|||
'name',
|
||||
is_shared='shared',
|
||||
project_id='tenant_id',
|
||||
**tag.TagMixin._tag_query_parameters
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
)
|
||||
|
||||
# Properties
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from openstack import resource
|
||||
from openstack import utils
|
||||
|
||||
|
||||
class TagMixin(object):
|
||||
|
||||
_tag_query_parameters = {
|
||||
'tags': 'tags',
|
||||
'any_tags': 'tags-any',
|
||||
'not_tags': 'not-tags',
|
||||
'not_any_tags': 'not-tags-any',
|
||||
}
|
||||
|
||||
#: A list of associated tags
|
||||
#: *Type: list of tag strings*
|
||||
tags = resource.Body('tags', type=list, default=[])
|
||||
|
||||
def set_tags(self, session, tags):
|
||||
url = utils.urljoin(self.base_path, self.id, 'tags')
|
||||
session.put(url,
|
||||
json={'tags': tags})
|
||||
self._body.attributes.update({'tags': tags})
|
||||
return self
|
|
@ -14,7 +14,7 @@ from openstack import resource
|
|||
from openstack import utils
|
||||
|
||||
|
||||
class Trunk(resource.Resource):
|
||||
class Trunk(resource.Resource, resource.TagMixin):
|
||||
resource_key = 'trunk'
|
||||
resources_key = 'trunks'
|
||||
base_path = '/trunks'
|
||||
|
@ -30,6 +30,7 @@ class Trunk(resource.Resource):
|
|||
'name', 'description', 'port_id', 'status', 'sub_ports',
|
||||
project_id='tenant_id',
|
||||
is_admin_state_up='admin_state_up',
|
||||
**resource.TagMixin._tag_query_parameters
|
||||
)
|
||||
|
||||
# Properties
|
||||
|
|
|
@ -1439,6 +1439,119 @@ class Resource(dict):
|
|||
"No %s found for %s" % (cls.__name__, name_or_id))
|
||||
|
||||
|
||||
class TagMixin(object):
|
||||
|
||||
_tag_query_parameters = {
|
||||
'tags': 'tags',
|
||||
'any_tags': 'tags-any',
|
||||
'not_tags': 'not-tags',
|
||||
'not_any_tags': 'not-tags-any',
|
||||
}
|
||||
|
||||
#: A list of associated tags
|
||||
#: *Type: list of tag strings*
|
||||
tags = Body('tags', type=list, default=[])
|
||||
|
||||
def fetch_tags(self, session):
|
||||
"""Lists tags set on the entity.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:return: The list with tags attached to the entity
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'tags')
|
||||
session = self._get_session(session)
|
||||
response = session.get(url)
|
||||
exceptions.raise_from_response(response)
|
||||
# NOTE(gtema): since this is a common method
|
||||
# we can't rely on the resource_key, because tags are returned
|
||||
# without resource_key. Do parse response here
|
||||
json = response.json()
|
||||
if 'tags' in json:
|
||||
self._body.attributes.update({'tags': json['tags']})
|
||||
return self
|
||||
|
||||
def set_tags(self, session, tags=[]):
|
||||
"""Sets/Replaces all tags on the resource.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:param list tags: List with tags to be set on the resource
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'tags')
|
||||
session = self._get_session(session)
|
||||
response = session.put(url, json={'tags': tags})
|
||||
exceptions.raise_from_response(response)
|
||||
self._body.attributes.update({'tags': tags})
|
||||
return self
|
||||
|
||||
def remove_all_tags(self, session):
|
||||
"""Removes all tags on the entity.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'tags')
|
||||
session = self._get_session(session)
|
||||
response = session.delete(url)
|
||||
exceptions.raise_from_response(response)
|
||||
self._body.attributes.update({'tags': []})
|
||||
return self
|
||||
|
||||
def check_tag(self, session, tag):
|
||||
"""Checks if tag exists on the entity.
|
||||
|
||||
If the tag does not exist a 404 will be returned
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:param tag: The tag as a string.
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'tags', tag)
|
||||
session = self._get_session(session)
|
||||
response = session.get(url)
|
||||
exceptions.raise_from_response(response,
|
||||
error_message='Tag does not exist')
|
||||
return self
|
||||
|
||||
def add_tag(self, session, tag):
|
||||
"""Adds a single tag to the resource.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:param tag: The tag as a string.
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'tags', tag)
|
||||
session = self._get_session(session)
|
||||
response = session.put(url)
|
||||
exceptions.raise_from_response(response)
|
||||
# we do not want to update tags directly
|
||||
tags = self.tags
|
||||
tags.append(tag)
|
||||
self._body.attributes.update({
|
||||
'tags': tags
|
||||
})
|
||||
return self
|
||||
|
||||
def remove_tag(self, session, tag):
|
||||
"""Removes a single tag from the specified server.
|
||||
|
||||
:param session: The session to use for making this request.
|
||||
:param tag: The tag as a string.
|
||||
"""
|
||||
url = utils.urljoin(self.base_path, self.id, 'tags', tag)
|
||||
session = self._get_session(session)
|
||||
response = session.delete(url)
|
||||
exceptions.raise_from_response(response)
|
||||
# we do not want to update tags directly
|
||||
tags = self.tags
|
||||
try:
|
||||
# NOTE(gtema): if tags were not fetched, but request suceeded
|
||||
# it is ok. Just ensure tag does not exist locally
|
||||
tags.remove(tag)
|
||||
except ValueError:
|
||||
pass # do nothing!
|
||||
self._body.attributes.update({
|
||||
'tags': tags
|
||||
})
|
||||
return self
|
||||
|
||||
|
||||
def _normalize_status(status):
|
||||
if status is not None:
|
||||
status = status.lower()
|
||||
|
|
|
@ -93,9 +93,9 @@ class TestServer(base.TestCase):
|
|||
"reservation_id": "reservation_id",
|
||||
"project_id": "project_id",
|
||||
"tags": "tags",
|
||||
"tags_any": "tags-any",
|
||||
"any_tags": "tags-any",
|
||||
"not_tags": "not-tags",
|
||||
"not_tags_any": "not-tags-any",
|
||||
"not_any_tags": "not-tags-any",
|
||||
"is_deleted": "deleted",
|
||||
"ipv4_address": "ip",
|
||||
"ipv6_address": "ip6",
|
||||
|
|
|
@ -49,6 +49,10 @@ class TestProject(base.TestCase):
|
|||
'is_enabled': 'enabled',
|
||||
'limit': 'limit',
|
||||
'marker': 'marker',
|
||||
'tags': 'tags',
|
||||
'any_tags': 'tags-any',
|
||||
'not_tags': 'not-tags',
|
||||
'not_any_tags': 'not-tags-any',
|
||||
},
|
||||
sot._query_mapping._mapping)
|
||||
|
||||
|
|
|
@ -103,6 +103,8 @@ class TestImage(base.TestCase):
|
|||
self.resp.json = mock.Mock(return_value=self.resp.body)
|
||||
self.sess = mock.Mock(spec=adapter.Adapter)
|
||||
self.sess.post = mock.Mock(return_value=self.resp)
|
||||
self.sess.put = mock.Mock(return_value=FakeResponse({}))
|
||||
self.sess.delete = mock.Mock(return_value=FakeResponse({}))
|
||||
self.sess.default_microversion = None
|
||||
self.sess.retriable_status_codes = None
|
||||
|
||||
|
@ -197,7 +199,7 @@ class TestImage(base.TestCase):
|
|||
sot = image.Image(**EXAMPLE)
|
||||
tag = "lol"
|
||||
|
||||
self.assertIsNone(sot.add_tag(self.sess, tag))
|
||||
sot.add_tag(self.sess, tag)
|
||||
self.sess.put.assert_called_with(
|
||||
'images/IDENTIFIER/tags/%s' % tag,
|
||||
)
|
||||
|
@ -206,7 +208,7 @@ class TestImage(base.TestCase):
|
|||
sot = image.Image(**EXAMPLE)
|
||||
tag = "lol"
|
||||
|
||||
self.assertIsNone(sot.remove_tag(self.sess, tag))
|
||||
sot.remove_tag(self.sess, tag)
|
||||
self.sess.delete.assert_called_with(
|
||||
'images/IDENTIFIER/tags/%s' % tag,
|
||||
)
|
||||
|
|
|
@ -1,56 +0,0 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import inspect
|
||||
import mock
|
||||
from openstack.tests.unit import base
|
||||
|
||||
from openstack.network.v2 import network
|
||||
import openstack.network.v2 as network_resources
|
||||
from openstack.network.v2.tag import TagMixin
|
||||
|
||||
ID = 'IDENTIFIER'
|
||||
|
||||
|
||||
class TestTag(base.TestCase):
|
||||
|
||||
@staticmethod
|
||||
def _create_network_resource(tags=None):
|
||||
tags = tags or []
|
||||
return network.Network(id=ID, name='test-net', tags=tags)
|
||||
|
||||
def test_tags_attribute(self):
|
||||
net = self._create_network_resource()
|
||||
self.assertTrue(hasattr(net, 'tags'))
|
||||
self.assertIsInstance(net.tags, list)
|
||||
|
||||
def test_set_tags(self):
|
||||
net = self._create_network_resource()
|
||||
sess = mock.Mock()
|
||||
result = net.set_tags(sess, ['blue', 'green'])
|
||||
# Check tags attribute is updated
|
||||
self.assertEqual(['blue', 'green'], net.tags)
|
||||
# Check the passed resource is returned
|
||||
self.assertEqual(net, result)
|
||||
url = 'networks/' + ID + '/tags'
|
||||
sess.put.assert_called_once_with(url,
|
||||
json={'tags': ['blue', 'green']})
|
||||
|
||||
def test_tagged_resource_always_created_with_empty_tag_list(self):
|
||||
for _, module in inspect.getmembers(network_resources,
|
||||
inspect.ismodule):
|
||||
for _, resource in inspect.getmembers(module, inspect.isclass):
|
||||
if issubclass(resource, TagMixin) and resource != TagMixin:
|
||||
x_resource = resource.new(
|
||||
id="%s_ID" % resource.resource_key.upper())
|
||||
self.assertIsNotNone(x_resource.tags)
|
||||
self.assertEqual(x_resource.tags, list())
|
|
@ -2226,3 +2226,166 @@ class TestAssertMicroversionFor(base.TestCase):
|
|||
self.res._assert_microversion_for,
|
||||
self.session, 'fetch', '1.6')
|
||||
mock_get_ver.assert_called_once_with(self.res, self.session, 'fetch')
|
||||
|
||||
|
||||
class TestTagMixin(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestTagMixin, self).setUp()
|
||||
|
||||
self.service_name = "service"
|
||||
self.base_path = "base_path"
|
||||
|
||||
class Test(resource.Resource, resource.TagMixin):
|
||||
service = self.service_name
|
||||
base_path = self.base_path
|
||||
resources_key = 'resources'
|
||||
allow_create = True
|
||||
allow_fetch = True
|
||||
allow_head = True
|
||||
allow_commit = True
|
||||
allow_delete = True
|
||||
allow_list = True
|
||||
|
||||
self.test_class = Test
|
||||
|
||||
self.request = mock.Mock(spec=resource._Request)
|
||||
self.request.url = "uri"
|
||||
self.request.body = "body"
|
||||
self.request.headers = "headers"
|
||||
|
||||
self.response = FakeResponse({})
|
||||
|
||||
self.sot = Test.new(id="id", tags=[])
|
||||
self.sot._prepare_request = mock.Mock(return_value=self.request)
|
||||
self.sot._translate_response = mock.Mock()
|
||||
|
||||
self.session = mock.Mock(spec=adapter.Adapter)
|
||||
self.session.get = mock.Mock(return_value=self.response)
|
||||
self.session.put = mock.Mock(return_value=self.response)
|
||||
self.session.delete = mock.Mock(return_value=self.response)
|
||||
|
||||
def test_tags_attribute(self):
|
||||
res = self.sot
|
||||
self.assertTrue(hasattr(res, 'tags'))
|
||||
self.assertIsInstance(res.tags, list)
|
||||
|
||||
def test_fetch_tags(self):
|
||||
res = self.sot
|
||||
sess = self.session
|
||||
|
||||
mock_response = mock.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.links = {}
|
||||
mock_response.json.return_value = {'tags': ['blue1', 'green1']}
|
||||
|
||||
sess.get.side_effect = [mock_response]
|
||||
|
||||
result = res.fetch_tags(sess)
|
||||
# Check tags attribute is updated
|
||||
self.assertEqual(['blue1', 'green1'], res.tags)
|
||||
# Check the passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/tags'
|
||||
sess.get.assert_called_once_with(url)
|
||||
|
||||
def test_set_tags(self):
|
||||
res = self.sot
|
||||
sess = self.session
|
||||
|
||||
# Set some initial value to check rewrite
|
||||
res.tags.extend(['blue_old', 'green_old'])
|
||||
|
||||
result = res.set_tags(sess, ['blue', 'green'])
|
||||
# Check tags attribute is updated
|
||||
self.assertEqual(['blue', 'green'], res.tags)
|
||||
# Check the passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/tags'
|
||||
sess.put.assert_called_once_with(
|
||||
url,
|
||||
json={'tags': ['blue', 'green']}
|
||||
)
|
||||
|
||||
def test_remove_all_tags(self):
|
||||
res = self.sot
|
||||
sess = self.session
|
||||
|
||||
# Set some initial value to check removal
|
||||
res.tags.extend(['blue_old', 'green_old'])
|
||||
|
||||
result = res.remove_all_tags(sess)
|
||||
# Check tags attribute is updated
|
||||
self.assertEqual([], res.tags)
|
||||
# Check the passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/tags'
|
||||
sess.delete.assert_called_once_with(url)
|
||||
|
||||
def test_remove_single_tag(self):
|
||||
res = self.sot
|
||||
sess = self.session
|
||||
|
||||
res.tags.extend(['blue', 'dummy'])
|
||||
|
||||
result = res.remove_tag(sess, 'dummy')
|
||||
# Check tags attribute is updated
|
||||
self.assertEqual(['blue'], res.tags)
|
||||
# Check the passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/tags/dummy'
|
||||
sess.delete.assert_called_once_with(url)
|
||||
|
||||
def test_check_tag_exists(self):
|
||||
res = self.sot
|
||||
sess = self.session
|
||||
|
||||
sess.get.side_effect = [FakeResponse(None, 202)]
|
||||
|
||||
result = res.check_tag(sess, 'blue')
|
||||
# Check tags attribute is updated
|
||||
self.assertEqual([], res.tags)
|
||||
# Check the passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/tags/blue'
|
||||
sess.get.assert_called_once_with(url)
|
||||
|
||||
def test_check_tag_not_exists(self):
|
||||
res = self.sot
|
||||
sess = self.session
|
||||
|
||||
mock_response = mock.Mock()
|
||||
mock_response.status_code = 404
|
||||
mock_response.links = {}
|
||||
mock_response.content = None
|
||||
|
||||
sess.get.side_effect = [mock_response]
|
||||
|
||||
# ensure we get 404
|
||||
self.assertRaises(
|
||||
exceptions.NotFoundException,
|
||||
res.check_tag,
|
||||
sess,
|
||||
'dummy',
|
||||
)
|
||||
|
||||
def test_add_tag(self):
|
||||
res = self.sot
|
||||
sess = self.session
|
||||
|
||||
# Set some initial value to check add
|
||||
res.tags.extend(['blue', 'green'])
|
||||
|
||||
result = res.add_tag(sess, 'lila')
|
||||
# Check tags attribute is updated
|
||||
self.assertEqual(['blue', 'green', 'lila'], res.tags)
|
||||
# Check the passed resource is returned
|
||||
self.assertEqual(res, result)
|
||||
url = self.base_path + '/' + res.id + '/tags/lila'
|
||||
sess.put.assert_called_once_with(url)
|
||||
|
||||
def test_tagged_resource_always_created_with_empty_tag_list(self):
|
||||
res = self.sot
|
||||
|
||||
self.assertIsNotNone(res.tags)
|
||||
self.assertEqual(res.tags, list())
|
||||
|
|
Loading…
Reference in New Issue