Switch cloud layer to use proxy for DNS

- use dns proxy in cloud layer
- fix DNS pagination (not-standard - argh)
- fix wrongly set resource_key for recordset
- add create_recordset in functional tests of dns
- streamline cloud tests of dns
- add required find_recordset function to the proxy

Change-Id: I28745475506848e0f205efd4427cbe2e9e81df17
Story: 2005749
Task: 33416
This commit is contained in:
Artem Goncharov 2019-06-14 21:29:28 +02:00
parent b0501643b9
commit 245ebae563
14 changed files with 615 additions and 289 deletions

View File

@ -15,6 +15,8 @@
# openstack.resource.Resource.list and openstack.resource2.Resource.list
import types # noqa
from openstack import exceptions
from openstack import resource
from openstack.cloud import exc
from openstack.cloud import _normalize
from openstack.cloud import _utils
@ -22,24 +24,16 @@ from openstack.cloud import _utils
class DnsCloudMixin(_normalize.Normalizer):
@property
def _dns_client(self):
if 'dns' not in self._raw_clients:
dns_client = self._get_versioned_client(
'dns', min_version=2, max_version='2.latest')
self._raw_clients['dns'] = dns_client
return self._raw_clients['dns']
def list_zones(self):
def list_zones(self, filters=None):
"""List all available zones.
:returns: A list of zones dicts.
"""
data = self._dns_client.get(
"/zones",
error_message="Error fetching zones list")
return self._get_and_munchify('zones', data)
if not filters:
filters = {}
return list(self.dns.zones(allow_unknown_params=True,
**filters))
def get_zone(self, name_or_id, filters=None):
"""Get a zone by name or ID.
@ -47,17 +41,20 @@ class DnsCloudMixin(_normalize.Normalizer):
:param name_or_id: Name or ID of the zone
:param filters:
A dictionary of meta data to use for further filtering
OR
A string containing a jmespath expression for further filtering.
Example:: "[?last_name==`Smith`] | [?other.gender]==`Female`]"
:returns: A zone dict or None if no matching zone is found.
"""
return _utils._get_entity(self, 'zone', name_or_id, filters)
if not filters:
filters = {}
zone = self.dns.find_zone(
name_or_id=name_or_id, ignore_missing=True, **filters)
if not zone:
return False
return zone
def search_zones(self, name_or_id=None, filters=None):
zones = self.list_zones()
zones = self.list_zones(filters)
return _utils._filter_list(zones, name_or_id, filters)
def create_zone(self, name, zone_type=None, email=None, description=None,
@ -101,10 +98,12 @@ class DnsCloudMixin(_normalize.Normalizer):
if masters is not None:
zone["masters"] = masters
data = self._dns_client.post(
"/zones", json=zone,
error_message="Unable to create zone {name}".format(name=name))
return self._get_and_munchify(key=None, data=data)
try:
return self.dns.create_zone(**zone)
except exceptions.SDKException as e:
raise exc.OpenStackCloudException(
"Unable to create zone {name}".format(name=name)
)
@_utils.valid_kwargs('email', 'description', 'ttl', 'masters')
def update_zone(self, name_or_id, **kwargs):
@ -127,10 +126,7 @@ class DnsCloudMixin(_normalize.Normalizer):
raise exc.OpenStackCloudException(
"Zone %s not found." % name_or_id)
data = self._dns_client.patch(
"/zones/{zone_id}".format(zone_id=zone['id']), json=kwargs,
error_message="Error updating zone {0}".format(name_or_id))
return self._get_and_munchify(key=None, data=data)
return self.dns.update_zone(zone['id'], **kwargs)
def delete_zone(self, name_or_id):
"""Delete a zone.
@ -142,54 +138,56 @@ class DnsCloudMixin(_normalize.Normalizer):
:raises: OpenStackCloudException on operation error.
"""
zone = self.get_zone(name_or_id)
if zone is None:
zone = self.dns.find_zone(name_or_id)
if not zone:
self.log.debug("Zone %s not found for deleting", name_or_id)
return False
return self._dns_client.delete(
"/zones/{zone_id}".format(zone_id=zone['id']),
error_message="Error deleting zone {0}".format(name_or_id))
self.dns.delete_zone(zone)
return True
def list_recordsets(self, zone):
"""List all available recordsets.
:param zone: Name or ID of the zone managing the recordset
:param zone: Name, ID or :class:`openstack.dns.v2.zone.Zone` instance
of the zone managing the recordset.
:returns: A list of recordsets.
"""
zone_obj = self.get_zone(zone)
if isinstance(zone, resource.Resource):
zone_obj = zone
else:
zone_obj = self.get_zone(zone)
if zone_obj is None:
raise exc.OpenStackCloudException(
"Zone %s not found." % zone)
return self._dns_client.get(
"/zones/{zone_id}/recordsets".format(zone_id=zone_obj['id']),
error_message="Error fetching recordsets list")['recordsets']
return list(self.dns.recordsets(zone_obj))
def get_recordset(self, zone, name_or_id):
"""Get a recordset by name or ID.
:param zone: Name or ID of the zone managing the recordset
:param zone: Name, ID or :class:`openstack.dns.v2.zone.Zone` instance
of the zone managing the recordset.
:param name_or_id: Name or ID of the recordset
:returns: A recordset dict or None if no matching recordset is
:returns: A recordset dict or False if no matching recordset is
found.
"""
zone_obj = self.get_zone(zone)
if zone_obj is None:
if isinstance(zone, resource.Resource):
zone_obj = zone
else:
zone_obj = self.get_zone(zone)
if not zone_obj:
raise exc.OpenStackCloudException(
"Zone %s not found." % zone)
try:
return self._dns_client.get(
"/zones/{zone_id}/recordsets/{recordset_id}".format(
zone_id=zone_obj['id'], recordset_id=name_or_id),
error_message="Error fetching recordset")
return self.dns.find_recordset(
zone=zone_obj, name_or_id=name_or_id, ignore_missing=False)
except Exception:
return None
return False
def search_recordsets(self, zone, name_or_id=None, filters=None):
recordsets = self.list_recordsets(zone=zone)
@ -199,7 +197,8 @@ class DnsCloudMixin(_normalize.Normalizer):
description=None, ttl=None):
"""Create a recordset.
:param zone: Name or ID of the zone managing the recordset
:param zone: Name, ID or :class:`openstack.dns.v2.zone.Zone` instance
of the zone managing the recordset.
:param name: Name of the recordset
:param recordset_type: Type of the recordset
:param records: List of the recordset definitions
@ -211,8 +210,11 @@ class DnsCloudMixin(_normalize.Normalizer):
:raises: OpenStackCloudException on operation error.
"""
zone_obj = self.get_zone(zone)
if zone_obj is None:
if isinstance(zone, resource.Resource):
zone_obj = zone
else:
zone_obj = self.get_zone(zone)
if not zone_obj:
raise exc.OpenStackCloudException(
"Zone %s not found." % zone)
@ -231,16 +233,14 @@ class DnsCloudMixin(_normalize.Normalizer):
if ttl:
body['ttl'] = ttl
return self._dns_client.post(
"/zones/{zone_id}/recordsets".format(zone_id=zone_obj['id']),
json=body,
error_message="Error creating recordset {name}".format(name=name))
return self.dns.create_recordset(zone=zone_obj, **body)
@_utils.valid_kwargs('description', 'ttl', 'records')
def update_recordset(self, zone, name_or_id, **kwargs):
"""Update a recordset.
:param zone: Name or ID of the zone managing the recordset
:param zone: Name, ID or :class:`openstack.dns.v2.zone.Zone` instance
of the zone managing the recordset.
:param name_or_id: Name or ID of the recordset being updated.
:param records: List of the recordset definitions
:param description: Description of the recordset
@ -250,27 +250,21 @@ class DnsCloudMixin(_normalize.Normalizer):
:raises: OpenStackCloudException on operation error.
"""
zone_obj = self.get_zone(zone)
if zone_obj is None:
raise exc.OpenStackCloudException(
"Zone %s not found." % zone)
recordset_obj = self.get_recordset(zone, name_or_id)
if recordset_obj is None:
rs = self.get_recordset(zone, name_or_id)
if not rs:
raise exc.OpenStackCloudException(
"Recordset %s not found." % name_or_id)
new_recordset = self._dns_client.put(
"/zones/{zone_id}/recordsets/{recordset_id}".format(
zone_id=zone_obj['id'], recordset_id=name_or_id), json=kwargs,
error_message="Error updating recordset {0}".format(name_or_id))
rs = self.dns.update_recordset(recordset=rs, **kwargs)
return new_recordset
return rs
def delete_recordset(self, zone, name_or_id):
"""Delete a recordset.
:param zone: Name or ID of the zone managing the recordset.
:param zone: Name, ID or :class:`openstack.dns.v2.zone.Zone` instance
of the zone managing the recordset.
:param name_or_id: Name or ID of the recordset being deleted.
:returns: True if delete succeeded, False otherwise.
@ -278,19 +272,11 @@ class DnsCloudMixin(_normalize.Normalizer):
:raises: OpenStackCloudException on operation error.
"""
zone_obj = self.get_zone(zone)
if zone_obj is None:
self.log.debug("Zone %s not found for deleting", zone)
return False
recordset = self.get_recordset(zone_obj['id'], name_or_id)
if recordset is None:
recordset = self.get_recordset(zone, name_or_id)
if not recordset:
self.log.debug("Recordset %s not found for deleting", name_or_id)
return False
self._dns_client.delete(
"/zones/{zone_id}/recordsets/{recordset_id}".format(
zone_id=zone_obj['id'], recordset_id=name_or_id),
error_message="Error deleting recordset {0}".format(name_or_id))
self.dns.delete_recordset(recordset, ignore_missing=False)
return True

105
openstack/dns/v2/_base.py Normal file
View File

@ -0,0 +1,105 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from openstack import exceptions
from openstack import resource
class Resource(resource.Resource):
@classmethod
def find(cls, session, name_or_id, ignore_missing=True, **params):
"""Find a resource by its name or id.
:param session: The session to use for making this request.
:type session: :class:`~keystoneauth1.adapter.Adapter`
:param name_or_id: This resource's identifier, if needed by
the request. The default is ``None``.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the resource does not exist.
When set to ``True``, None will be returned when
attempting to find a nonexistent resource.
:param dict params: Any additional parameters to be passed into
underlying methods, such as to
:meth:`~openstack.resource.Resource.existing`
in order to pass on URI parameters.
:return: The :class:`Resource` object matching the given name or id
or None if nothing matches.
:raises: :class:`openstack.exceptions.DuplicateResource` if more
than one resource is found for this request.
:raises: :class:`openstack.exceptions.ResourceNotFound` if nothing
is found and ignore_missing is ``False``.
"""
session = cls._get_session(session)
# Try to short-circuit by looking directly for a matching ID.
try:
match = cls.existing(
id=name_or_id,
connection=session._get_connection(),
**params)
return match.fetch(session, **params)
except exceptions.SDKException:
# DNS may return 400 when we try to do GET with name
pass
if ('name' in cls._query_mapping._mapping.keys()
and 'name' not in params):
params['name'] = name_or_id
data = cls.list(session, **params)
result = cls._get_one_match(name_or_id, data)
if result is not None:
return result
if ignore_missing:
return None
raise exceptions.ResourceNotFound(
"No %s found for %s" % (cls.__name__, name_or_id))
@classmethod
def _get_next_link(cls, uri, response, data, marker, limit, total_yielded):
next_link = None
params = {}
if isinstance(data, dict):
links = data.get('links')
if links:
next_link = links.get('next')
total = data.get('metadata', {}).get('total_count')
if total:
# We have a kill switch
total_count = int(total)
if total_count <= total_yielded:
return None, params
# Parse params from Link (next page URL) into params.
# This prevents duplication of query parameters that with large
# number of pages result in HTTP 414 error eventually.
if next_link:
parts = six.moves.urllib.parse.urlparse(next_link)
query_params = six.moves.urllib.parse.parse_qs(parts.query)
params.update(query_params)
next_link = six.moves.urllib.parse.urljoin(next_link,
parts.path)
# If we still have no link, and limit was given and is non-zero,
# and the number of records yielded equals the limit, then the user
# is playing pagination ball so we should go ahead and try once more.
if not next_link and limit:
next_link = uri
params['marker'] = marker
params['limit'] = limit
return next_link, params

View File

@ -88,7 +88,7 @@ class Proxy(proxy.Proxy):
"""
return self._update(_zone.Zone, zone, **attrs)
def find_zone(self, name_or_id, ignore_missing=True):
def find_zone(self, name_or_id, ignore_missing=True, **attrs):
"""Find a single zone
:param name_or_id: The name or ID of a zone
@ -216,6 +216,25 @@ class Proxy(proxy.Proxy):
return self._delete(_rs.Recordset, recordset,
ignore_missing=ignore_missing)
def find_recordset(self, zone, name_or_id, ignore_missing=True, **attrs):
"""Find a single recordset
:param zone: The value can be the ID of a zone
or a :class:`~openstack.dns.v2.zone.Zone` instance.
:param name_or_id: The name or ID of a zone
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be raised
when the zone does not exist.
When set to ``True``, no exception will be set when attempting
to delete a nonexistent zone.
:returns: :class:`~openstack.dns.v2.recordset.Recordset`
"""
zone = self._get_resource(_zone.Zone, zone)
return self._find(_rs.Recordset, name_or_id,
ignore_missing=ignore_missing, zone_id=zone.id,
**attrs)
# ======== Zone Imports ========
def zone_imports(self, **query):
"""Retrieve a generator of zone imports

View File

@ -9,11 +9,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# from openstack import exceptions
from openstack import resource
from openstack.dns.v2 import _base
class FloatingIP(resource.Resource):
class FloatingIP(_base.Resource):
"""DNS Floating IP Resource"""
resource_key = ''
resources_key = 'floatingips'

View File

@ -9,13 +9,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# from openstack import exceptions
from openstack import resource
from openstack.dns.v2 import _base
class Recordset(resource.Resource):
class Recordset(_base.Resource):
"""DNS Recordset Resource"""
resource_key = 'recordset'
resources_key = 'recordsets'
base_path = '/zones/%(zone_id)s/recordsets'

View File

@ -13,8 +13,10 @@ from openstack import exceptions
from openstack import resource
from openstack import utils
from openstack.dns.v2 import _base
class Zone(resource.Resource):
class Zone(_base.Resource):
"""DNS ZONE Resource"""
resources_key = 'zones'
base_path = '/zones'

View File

@ -12,8 +12,10 @@
from openstack import exceptions
from openstack import resource
from openstack.dns.v2 import _base
class ZoneExport(resource.Resource):
class ZoneExport(_base.Resource):
"""DNS Zone Exports Resource"""
resource_key = ''
resources_key = 'exports'

View File

@ -12,8 +12,10 @@
from openstack import exceptions
from openstack import resource
from openstack.dns.v2 import _base
class ZoneImport(resource.Resource):
class ZoneImport(_base.Resource):
"""DNS Zone Import Resource"""
resource_key = ''
resources_key = 'imports'

View File

@ -11,8 +11,10 @@
# under the License.
from openstack import resource
from openstack.dns.v2 import _base
class ZoneTransferBase(resource.Resource):
class ZoneTransferBase(_base.Resource):
"""DNS Zone Transfer Request/Accept Base Resource"""
_query_mapping = resource.QueryParameters(

View File

@ -50,3 +50,14 @@ class TestZone(base.BaseFunctionalTest):
def test_list_zones(self):
names = [f.name for f in self.conn.dns.zones()]
self.assertIn(self.ZONE_NAME, names)
def test_create_rs(self):
zone = self.conn.dns.get_zone(self.zone)
self.assertIsNotNone(self.conn.dns.create_recordset(
zone=zone,
name='www.{zone}'.format(zone=zone.name),
type='A',
description='Example zone rec',
ttl=3600,
records=['192.168.1.1']
))

View File

@ -9,36 +9,28 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import testtools
import openstack.cloud
from openstack import exceptions
from openstack.tests.unit import base
from openstack.tests.unit.cloud import test_zone
zone = {
'id': '1',
'name': 'example.net.',
'type': 'PRIMARY',
'email': 'test@example.net',
'description': 'Example zone',
'ttl': 3600,
}
zone = test_zone.zone_dict
recordset = {
'name': 'www.example.net.',
'type': 'A',
'description': 'Example zone',
'description': 'Example zone rec',
'ttl': 3600,
'records': ['192.168.1.1']
'records': ['192.168.1.1'],
'id': '1',
'zone_id': zone['id'],
'zone_name': zone['name']
}
recordset_zone = '1'
new_recordset = copy.copy(recordset)
new_recordset['id'] = '1'
new_recordset['zone'] = recordset_zone
class RecordsetTestWrapper(test_zone.ZoneTestWrapper):
pass
class TestRecordset(base.TestCase):
@ -47,43 +39,86 @@ class TestRecordset(base.TestCase):
super(TestRecordset, self).setUp()
self.use_designate()
def test_create_recordset(self):
def test_create_recordset_zoneid(self):
fake_zone = test_zone.ZoneTestWrapper(self, zone)
fake_rs = RecordsetTestWrapper(self, recordset)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [zone],
"links": {},
"metadata": {
'total_count': 1}}),
'dns', 'public', append=['v2', 'zones', fake_zone['id']]),
json=fake_zone.get_get_response_json()),
dict(method='POST',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', zone['id'], 'recordsets']),
json=new_recordset,
validate=dict(json=recordset)),
json=fake_rs.get_create_response_json(),
validate=dict(json={
"records": fake_rs['records'],
"type": fake_rs['type'],
"name": fake_rs['name'],
"description": fake_rs['description'],
"ttl": fake_rs['ttl']
})),
])
rs = self.cloud.create_recordset(
zone=recordset_zone,
name=recordset['name'],
recordset_type=recordset['type'],
records=recordset['records'],
description=recordset['description'],
ttl=recordset['ttl'])
self.assertEqual(new_recordset, rs)
zone=fake_zone['id'],
name=fake_rs['name'],
recordset_type=fake_rs['type'],
records=fake_rs['records'],
description=fake_rs['description'],
ttl=fake_rs['ttl'])
fake_rs.cmp(rs)
self.assert_calls()
def test_create_recordset_zonename(self):
fake_zone = test_zone.ZoneTestWrapper(self, zone)
fake_rs = RecordsetTestWrapper(self, recordset)
self.register_uris([
# try by directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['name']]),
status_code=404),
# list with name
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones'],
qs_elements=[
'name={name}'.format(name=fake_zone['name'])]),
json={'zones': [fake_zone.get_get_response_json()]}),
dict(method='POST',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', zone['id'], 'recordsets']),
json=fake_rs.get_create_response_json(),
validate=dict(json={
"records": fake_rs['records'],
"type": fake_rs['type'],
"name": fake_rs['name'],
"description": fake_rs['description'],
"ttl": fake_rs['ttl']
})),
])
rs = self.cloud.create_recordset(
zone=fake_zone['name'],
name=fake_rs['name'],
recordset_type=fake_rs['type'],
records=fake_rs['records'],
description=fake_rs['description'],
ttl=fake_rs['ttl'])
fake_rs.cmp(rs)
self.assert_calls()
def test_create_recordset_exception(self):
fake_zone = test_zone.ZoneTestWrapper(self, zone)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [zone],
"links": {},
"metadata": {
'total_count': 1}}),
'dns', 'public', append=['v2', 'zones', fake_zone['id']]),
json=fake_zone.get_get_response_json()),
dict(method='POST',
uri=self.get_mock_url(
'dns', 'public',
@ -94,149 +129,238 @@ class TestRecordset(base.TestCase):
'records': ['192.168.1.2'],
'type': 'A'})),
])
with testtools.ExpectedException(
openstack.cloud.exc.OpenStackCloudHTTPError,
"Error creating recordset www2.example.net."
):
self.cloud.create_recordset('1', 'www2.example.net.',
'a', ['192.168.1.2'])
self.assertRaises(
exceptions.SDKException,
self.cloud.create_recordset,
fake_zone['id'], 'www2.example.net.', 'a', ['192.168.1.2']
)
self.assert_calls()
def test_update_recordset(self):
fake_zone = test_zone.ZoneTestWrapper(self, zone)
fake_rs = RecordsetTestWrapper(self, recordset)
new_ttl = 7200
expected_recordset = {
'name': recordset['name'],
'records': recordset['records'],
'type': recordset['type']
}
expected_recordset = recordset.copy()
expected_recordset['ttl'] = new_ttl
updated_rs = RecordsetTestWrapper(self, expected_recordset)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [zone],
"links": {},
"metadata": {
'total_count': 1}}),
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [zone],
"links": {},
"metadata": {
'total_count': 1}}),
# try by directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', zone['id'],
'recordsets', new_recordset['id']]),
json=new_recordset),
append=['v2', 'zones', fake_zone['name']]),
status_code=404),
# list with name
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones'],
qs_elements=[
'name={name}'.format(name=fake_zone['name'])]),
json={'zones': [fake_zone.get_get_response_json()]}),
# try directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'],
'recordsets', fake_rs['name']]),
status_code=404),
# list with name
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'],
'recordsets'],
qs_elements=['name={name}'.format(name=fake_rs['name'])]),
json={'recordsets': [fake_rs.get_get_response_json()]}),
# update
dict(method='PUT',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', zone['id'],
'recordsets', new_recordset['id']]),
json=expected_recordset,
append=['v2', 'zones', fake_zone['id'],
'recordsets', fake_rs['id']]),
json=updated_rs.get_get_response_json(),
validate=dict(json={'ttl': new_ttl}))
])
updated_rs = self.cloud.update_recordset('1', '1', ttl=new_ttl)
self.assertEqual(expected_recordset, updated_rs)
res = self.cloud.update_recordset(
fake_zone['name'], fake_rs['name'], ttl=new_ttl)
updated_rs.cmp(res)
self.assert_calls()
def test_delete_recordset(self):
def test_list_recordsets(self):
fake_zone = test_zone.ZoneTestWrapper(self, zone)
fake_rs = RecordsetTestWrapper(self, recordset)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [zone],
"links": {},
"metadata": {
'total_count': 1}}),
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [zone],
"links": {},
"metadata": {
'total_count': 1}}),
# try by directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', zone['id'],
'recordsets', new_recordset['id']]),
json=new_recordset),
append=['v2', 'zones', fake_zone['id']]),
json=fake_zone.get_get_response_json()),
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'], 'recordsets']),
json={'recordsets': [fake_rs.get_get_response_json()],
'links': {
'next': self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'],
'recordsets?limit=1&marker=asd']),
'self': self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'],
'recordsets?limit=1'])},
'metadata':{'total_count': 2}}),
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'], 'recordsets'],
qs_elements=[
'limit=1', 'marker=asd']),
json={'recordsets': [fake_rs.get_get_response_json()]}),
])
res = self.cloud.list_recordsets(fake_zone['id'])
self.assertEqual(2, len(res))
self.assert_calls()
def test_delete_recordset(self):
fake_zone = test_zone.ZoneTestWrapper(self, zone)
fake_rs = RecordsetTestWrapper(self, recordset)
self.register_uris([
# try by directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['name']]),
status_code=404),
# list with name
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones'],
qs_elements=[
'name={name}'.format(name=fake_zone['name'])]),
json={'zones': [fake_zone.get_get_response_json()]}),
# try directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'],
'recordsets', fake_rs['name']]),
status_code=404),
# list with name
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'],
'recordsets'],
qs_elements=[
'name={name}'.format(name=fake_rs['name'])]),
json={'recordsets': [fake_rs.get_get_response_json()]}),
dict(method='DELETE',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', zone['id'],
'recordsets', new_recordset['id']]),
json={})
'recordsets', fake_rs['id']]),
status_code=202)
])
self.assertTrue(self.cloud.delete_recordset('1', '1'))
self.assertTrue(
self.cloud.delete_recordset(fake_zone['name'], fake_rs['name']))
self.assert_calls()
def test_get_recordset_by_id(self):
fake_zone = test_zone.ZoneTestWrapper(self, zone)
fake_rs = RecordsetTestWrapper(self, recordset)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [zone],
"links": {},
"metadata": {
'total_count': 1}}),
# try by directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', '1', 'recordsets', '1']),
json=new_recordset),
append=['v2', 'zones', fake_zone['name']]),
status_code=404),
# list with name
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones'],
qs_elements=[
'name={name}'.format(name=fake_zone['name'])]),
json={'zones': [fake_zone.get_get_response_json()]}),
# try directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'],
'recordsets', fake_rs['id']]),
json=fake_rs.get_get_response_json())
])
recordset = self.cloud.get_recordset('1', '1')
self.assertEqual(recordset['id'], '1')
res = self.cloud.get_recordset(fake_zone['name'], fake_rs['id'])
fake_rs.cmp(res)
self.assert_calls()
def test_get_recordset_by_name(self):
fake_zone = test_zone.ZoneTestWrapper(self, zone)
fake_rs = RecordsetTestWrapper(self, recordset)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [zone],
"links": {},
"metadata": {
'total_count': 1}}),
# try by directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', '1', 'recordsets',
new_recordset['name']]),
json=new_recordset)
append=['v2', 'zones', fake_zone['name']]),
status_code=404),
# list with name
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones'],
qs_elements=[
'name={name}'.format(name=fake_zone['name'])]),
json={'zones': [fake_zone.get_get_response_json()]}),
# try directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'],
'recordsets', fake_rs['name']]),
status_code=404),
# list with name
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'],
'recordsets'],
qs_elements=['name={name}'.format(name=fake_rs['name'])]),
json={'recordsets': [fake_rs.get_get_response_json()]})
])
recordset = self.cloud.get_recordset('1', new_recordset['name'])
self.assertEqual(new_recordset['name'], recordset['name'])
res = self.cloud.get_recordset(fake_zone['name'], fake_rs['name'])
fake_rs.cmp(res)
self.assert_calls()
def test_get_recordset_not_found_returns_false(self):
recordset_name = "www.nonexistingrecord.net."
fake_zone = test_zone.ZoneTestWrapper(self, zone)
self.register_uris([
# try by directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [zone],
"links": {},
"metadata": {
'total_count': 1}}),
'dns', 'public', append=['v2', 'zones', fake_zone['id']]),
json=fake_zone.get_get_response_json()),
# try directly
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', '1', 'recordsets',
recordset_name]),
json=[])
append=['v2', 'zones', fake_zone['id'],
'recordsets', 'fake']),
status_code=404),
# list with name
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones', fake_zone['id'],
'recordsets'],
qs_elements=['name=fake']),
json={'recordsets': []})
])
recordset = self.cloud.get_recordset('1', recordset_name)
self.assertFalse(recordset)
res = self.cloud.get_recordset(fake_zone['id'], 'fake')
self.assertFalse(res)
self.assert_calls()

View File

@ -9,13 +9,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import testtools
import openstack.cloud
from openstack.tests.unit import base
from openstack import exceptions
zone_dict = {
'name': 'example.net.',
@ -23,10 +22,35 @@ zone_dict = {
'email': 'test@example.net',
'description': 'Example zone',
'ttl': 3600,
'id': '1'
}
new_zone_dict = copy.copy(zone_dict)
new_zone_dict['id'] = '1'
class ZoneTestWrapper(object):
def __init__(self, ut, attrs):
self.remote_res = attrs
self.ut = ut
def get_create_response_json(self):
return self.remote_res
def get_get_response_json(self):
return self.remote_res
def __getitem__(self, key):
"""Dict access to be able to access properties easily
"""
return self.remote_res[key]
def cmp(self, other):
ut = self.ut
me = self.remote_res
for k, v in me.items():
# Go over known attributes. We might of course compare others,
# but not necessary here
ut.assertEqual(v, other[k])
class TestZone(base.TestCase):
@ -36,13 +60,19 @@ class TestZone(base.TestCase):
self.use_designate()
def test_create_zone(self):
fake_zone = ZoneTestWrapper(self, zone_dict)
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json=new_zone_dict,
validate=dict(
json=zone_dict))
json=fake_zone.get_create_response_json(),
validate=dict(json={
'description': zone_dict['description'],
'email': zone_dict['email'],
'name': zone_dict['name'],
'ttl': zone_dict['ttl'],
'type': 'PRIMARY'
}))
])
z = self.cloud.create_zone(
name=zone_dict['name'],
@ -51,7 +81,7 @@ class TestZone(base.TestCase):
description=zone_dict['description'],
ttl=zone_dict['ttl'],
masters=None)
self.assertEqual(new_zone_dict, z)
fake_zone.cmp(z)
self.assert_calls()
def test_create_zone_exception(self):
@ -61,96 +91,127 @@ class TestZone(base.TestCase):
'dns', 'public', append=['v2', 'zones']),
status_code=500)
])
with testtools.ExpectedException(
openstack.cloud.exc.OpenStackCloudHTTPError,
"Unable to create zone example.net."
):
self.cloud.create_zone('example.net.')
self.assertRaises(
exceptions.SDKException,
self.cloud.create_zone,
'example.net.'
)
self.assert_calls()
def test_update_zone(self):
fake_zone = ZoneTestWrapper(self, zone_dict)
new_ttl = 7200
updated_zone = copy.copy(new_zone_dict)
updated_zone['ttl'] = new_ttl
updated_zone_dict = copy.copy(zone_dict)
updated_zone_dict['ttl'] = new_ttl
updated_zone = ZoneTestWrapper(self, updated_zone_dict)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [new_zone_dict],
"links": {},
"metadata": {
'total_count': 1}}),
'dns', 'public', append=['v2', 'zones', fake_zone['id']]),
json=fake_zone.get_get_response_json()),
dict(method='PATCH',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones', '1']),
json=updated_zone,
validate=dict(
json={"ttl": new_ttl}))
'dns', 'public', append=['v2', 'zones', fake_zone['id']]),
json=updated_zone.get_get_response_json(),
validate=dict(json={"ttl": new_ttl}))
])
z = self.cloud.update_zone('1', ttl=new_ttl)
self.assertEqual(updated_zone, z)
z = self.cloud.update_zone(fake_zone['id'], ttl=new_ttl)
updated_zone.cmp(z)
self.assert_calls()
def test_delete_zone(self):
fake_zone = ZoneTestWrapper(self, zone_dict)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [new_zone_dict],
"links": {},
"metadata": {
'total_count': 1}}),
'dns', 'public', append=['v2', 'zones', fake_zone['id']]),
json=fake_zone.get_get_response_json()),
dict(method='DELETE',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones', '1']),
json=new_zone_dict)
'dns', 'public', append=['v2', 'zones', fake_zone['id']]),
status_code=202)
])
self.assertTrue(self.cloud.delete_zone('1'))
self.assertTrue(self.cloud.delete_zone(fake_zone['id']))
self.assert_calls()
def test_get_zone_by_id(self):
fake_zone = ZoneTestWrapper(self, zone_dict)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [new_zone_dict],
"links": {},
"metadata": {
'total_count': 1}})
'dns', 'public', append=['v2', 'zones', fake_zone['id']]),
json=fake_zone.get_get_response_json())
])
zone = self.cloud.get_zone('1')
self.assertEqual(zone['id'], '1')
res = self.cloud.get_zone(fake_zone['id'])
fake_zone.cmp(res)
self.assert_calls()
def test_get_zone_by_name(self):
fake_zone = ZoneTestWrapper(self, zone_dict)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [new_zone_dict],
"links": {},
"metadata": {
'total_count': 1}})
'dns', 'public',
append=['v2', 'zones', fake_zone['name']]),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones'],
qs_elements=[
'name={name}'.format(name=fake_zone['name'])]),
json={"zones": [fake_zone.get_get_response_json()]})
])
zone = self.cloud.get_zone('example.net.')
self.assertEqual(zone['name'], 'example.net.')
res = self.cloud.get_zone(fake_zone['name'])
fake_zone.cmp(res)
self.assert_calls()
def test_get_zone_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones']),
json={
"zones": [],
"links": {},
"metadata": {
'total_count': 1}})
'dns', 'public',
append=['v2', 'zones', 'nonexistingzone.net.']),
status_code=404),
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public', append=['v2', 'zones'],
qs_elements=['name=nonexistingzone.net.']),
json={"zones": []})
])
zone = self.cloud.get_zone('nonexistingzone.net.')
self.assertFalse(zone)
self.assert_calls()
def test_list_zones(self):
fake_zone = ZoneTestWrapper(self, zone_dict)
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones']),
json={'zones': [fake_zone.get_get_response_json()],
'links': {
'next': self.get_mock_url(
'dns', 'public',
append=['v2', 'zones',
'?limit=1&marker=asd']),
'self': self.get_mock_url(
'dns', 'public',
append=['v2', 'zones',
'?limit=1'])},
'metadata':{'total_count': 2}}),
dict(method='GET',
uri=self.get_mock_url(
'dns', 'public',
append=['v2', 'zones/'],
qs_elements=[
'limit=1', 'marker=asd']),
json={'zones': [fake_zone.get_get_response_json()]}),
])
res = self.cloud.list_zones()
# updated_rs.cmp(res)
self.assertEqual(2, len(res))
self.assert_calls()

View File

@ -50,14 +50,16 @@ class TestDnsZone(TestDnsProxy):
self.verify_update(self.proxy.update_zone, zone.Zone)
def test_zone_abandon(self):
self._verify("openstack.dns.v2.zone.Zone.abandon",
self.proxy.abandon_zone,
method_args=[{'zone': 'id'}])
self._verify2("openstack.dns.v2.zone.Zone.abandon",
self.proxy.abandon_zone,
method_args=[{'zone': 'id'}],
expected_args=[self.proxy])
def test_zone_xfr(self):
self._verify("openstack.dns.v2.zone.Zone.xfr",
self.proxy.xfr_zone,
method_args=[{'zone': 'id'}])
self._verify2("openstack.dns.v2.zone.Zone.xfr",
self.proxy.xfr_zone,
method_args=[{'zone': 'id'}],
expected_args=[self.proxy])
class TestDnsRecordset(TestDnsProxy):
@ -89,6 +91,15 @@ class TestDnsRecordset(TestDnsProxy):
method_kwargs={'zone': 'zid'},
expected_kwargs={'zone_id': 'zid'})
def test_recordset_find(self):
self._verify2("openstack.proxy.Proxy._find",
self.proxy.find_recordset,
method_args=['zone', 'rs'],
method_kwargs={},
expected_args=[recordset.Recordset, 'rs'],
expected_kwargs={'ignore_missing': True,
'zone_id': 'zone'})
class TestDnsFloatIP(TestDnsProxy):
def test_floating_ips(self):

View File

@ -40,7 +40,7 @@ class TestRecordset(base.TestCase):
def test_basic(self):
sot = recordset.Recordset()
self.assertEqual('recordset', sot.resource_key)
self.assertIsNone(sot.resource_key)
self.assertEqual('recordsets', sot.resources_key)
self.assertEqual('/zones/%(zone_id)s/recordsets', sot.base_path)
self.assertTrue(sot.allow_list)