Remove unused central code

The following unused rpc calls were removed.
- create_record
- update_record
- delete_record
- sync_record
- sync_zone
- sync_zones
- touch_zone

Change-Id: Iaccdb9b7d586ee04270dad656924a8867f8f7c2c
This commit is contained in:
Erik Olof Gunnar Andersson 2022-01-12 20:49:18 -08:00
parent 057329deb1
commit c38810b15f
18 changed files with 218 additions and 1442 deletions

View File

@ -93,10 +93,3 @@ class Backend(DriverPlugin):
:param context: Security context information.
:param zone: the DNS zone.
"""
def ping(self, context):
"""Ping the Backend service"""
return {
'status': None
}

View File

@ -54,6 +54,3 @@ class InfobloxBackend(base.Backend):
def delete_zone(self, context, zone):
LOG.info('Delete Zone %r', zone)
self.infoblox.delete_zone_auth(zone['name'][0:-1])
def ping(self, context):
LOG.info('Ping')

View File

@ -65,8 +65,9 @@ class CentralAPI(object):
6.1 - Add ServiceStatus methods
6.2 - Changed 'find_recordsets' method args
6.3 - Changed 'update_status' method args
6.4 - Removed unused record and diagnostic methods
"""
RPC_API_VERSION = '6.3'
RPC_API_VERSION = '6.4'
# This allows us to mark some methods as not logged.
# This can be for a few reasons - some methods my not actually call over
@ -79,7 +80,7 @@ class CentralAPI(object):
target = messaging.Target(topic=self.topic,
version=self.RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap='6.3')
self.client = rpc.get_client(target, version_cap='6.4')
@classmethod
def get_instance(cls):
@ -178,9 +179,6 @@ class CentralAPI(object):
def count_zones(self, context, criterion=None):
return self.client.call(context, 'count_zones', criterion=criterion)
def touch_zone(self, context, zone_id):
return self.client.call(context, 'touch_zone', zone_id=zone_id)
# TLD Methods
def create_tld(self, context, tld):
return self.client.call(context, 'create_tld', tld=tld)
@ -239,14 +237,6 @@ class CentralAPI(object):
criterion=criterion)
# Record Methods
def create_record(self, context, zone_id, recordset_id, record,
increment_serial=True):
return self.client.call(context, 'create_record',
zone_id=zone_id,
recordset_id=recordset_id,
record=record,
increment_serial=increment_serial)
def get_record(self, context, zone_id, recordset_id, record_id):
return self.client.call(context, 'get_record',
zone_id=zone_id,
@ -262,19 +252,6 @@ class CentralAPI(object):
def find_record(self, context, criterion=None):
return self.client.call(context, 'find_record', criterion=criterion)
def update_record(self, context, record, increment_serial=True):
return self.client.call(context, 'update_record',
record=record,
increment_serial=increment_serial)
def delete_record(self, context, zone_id, recordset_id, record_id,
increment_serial=True):
return self.client.call(context, 'delete_record',
zone_id=zone_id,
recordset_id=recordset_id,
record_id=record_id,
increment_serial=increment_serial)
def count_records(self, context, criterion=None):
return self.client.call(context, 'count_records', criterion=criterion)
@ -282,19 +259,6 @@ class CentralAPI(object):
def count_report(self, context, criterion=None):
return self.client.call(context, 'count_report', criterion=criterion)
# Sync Methods
def sync_zones(self, context):
return self.client.call(context, 'sync_zones')
def sync_zone(self, context, zone_id):
return self.client.call(context, 'sync_zone', zone_id=zone_id)
def sync_record(self, context, zone_id, recordset_id, record_id):
return self.client.call(context, 'sync_record',
zone_id=zone_id,
recordset_id=recordset_id,
record_id=record_id)
def list_floatingips(self, context):
return self.client.call(context, 'list_floatingips')

View File

@ -185,7 +185,7 @@ def notification(notification_type):
class Service(service.RPCService):
RPC_API_VERSION = '6.3'
RPC_API_VERSION = '6.4'
target = messaging.Target(version=RPC_API_VERSION)
@ -1326,40 +1326,6 @@ class Service(service.RPCService):
return reports
@rpc.expected_exceptions()
@notification('dns.zone.touch')
@synchronized_zone()
def touch_zone(self, context, zone_id):
zone = self.storage.get_zone(context, zone_id)
if policy.enforce_new_defaults():
target = {
'zone_id': zone_id,
'zone_name': zone.name,
constants.RBAC_PROJECT_ID: zone.tenant_id
}
else:
target = {
'zone_id': zone_id,
'zone_name': zone.name,
'tenant_id': zone.tenant_id
}
policy.check('touch_zone', context, target)
self._touch_zone_in_storage(context, zone)
self.zone_api.update_zone(context, zone)
return zone
@transaction
def _touch_zone_in_storage(self, context, zone):
zone = self._increment_zone_serial(context, zone)
return zone
# RecordSet Methods
@rpc.expected_exceptions()
@notification('dns.recordset.create')
@ -1703,69 +1669,6 @@ class Service(service.RPCService):
return self.storage.count_recordsets(context, criterion)
# Record Methods
@rpc.expected_exceptions()
@notification('dns.record.create')
@synchronized_zone()
def create_record(self, context, zone_id, recordset_id, record,
increment_serial=True):
zone = self.storage.get_zone(context, zone_id)
# Don't allow updates to zones that are being deleted
if zone.action == 'DELETE':
raise exceptions.BadRequest('Can not update a deleting zone')
recordset = self.storage.get_recordset(context, recordset_id)
if policy.enforce_new_defaults():
target = {
'zone_id': zone_id,
'zone_name': zone.name,
'zone_type': zone.type,
'recordset_id': recordset_id,
'recordset_name': recordset.name,
constants.RBAC_PROJECT_ID: zone.tenant_id
}
else:
target = {
'zone_id': zone_id,
'zone_name': zone.name,
'zone_type': zone.type,
'recordset_id': recordset_id,
'recordset_name': recordset.name,
'tenant_id': zone.tenant_id
}
policy.check('create_record', context, target)
record, zone = self._create_record_in_storage(
context, zone, recordset, record,
increment_serial=increment_serial)
self.zone_api.update_zone(context, zone)
return record
@transaction
def _create_record_in_storage(self, context, zone, recordset, record,
increment_serial=True):
# Ensure the tenant has enough quota to continue
self._enforce_record_quota(context, zone, recordset)
if increment_serial:
# update the zone's status and increment the serial
zone = self._update_zone_in_storage(
context, zone, increment_serial)
record.action = 'CREATE'
record.status = 'PENDING'
record.serial = zone.serial
record = self.storage.create_record(context, zone.id, recordset.id,
record)
return record, zone
@rpc.expected_exceptions()
def get_record(self, context, zone_id, recordset_id, record_id):
zone = self.storage.get_zone(context, zone_id)
@ -1827,158 +1730,6 @@ class Service(service.RPCService):
return self.storage.find_record(context, criterion)
@rpc.expected_exceptions()
@notification('dns.record.update')
@synchronized_zone()
def update_record(self, context, record, increment_serial=True):
zone_id = record.obj_get_original_value('zone_id')
zone = self.storage.get_zone(context, zone_id)
# Don't allow updates to zones that are being deleted
if zone.action == 'DELETE':
raise exceptions.BadRequest('Can not update a deleting zone')
recordset_id = record.obj_get_original_value('recordset_id')
recordset = self.storage.get_recordset(context, recordset_id)
changes = record.obj_get_changes()
# Ensure immutable fields are not changed
if 'tenant_id' in changes:
raise exceptions.BadRequest('Moving a recordset between tenants '
'is not allowed')
if 'zone_id' in changes:
raise exceptions.BadRequest('Moving a recordset between zones '
'is not allowed')
if 'recordset_id' in changes:
raise exceptions.BadRequest('Moving a recordset between '
'recordsets is not allowed')
if policy.enforce_new_defaults():
target = {
'zone_id': record.obj_get_original_value('zone_id'),
'zone_name': zone.name,
'zone_type': zone.type,
'recordset_id': record.obj_get_original_value('recordset_id'),
'recordset_name': recordset.name,
'record_id': record.obj_get_original_value('id'),
constants.RBAC_PROJECT_ID: zone.tenant_id
}
else:
target = {
'zone_id': record.obj_get_original_value('zone_id'),
'zone_name': zone.name,
'zone_type': zone.type,
'recordset_id': record.obj_get_original_value('recordset_id'),
'recordset_name': recordset.name,
'record_id': record.obj_get_original_value('id'),
'tenant_id': zone.tenant_id
}
policy.check('update_record', context, target)
if recordset.managed and not context.edit_managed_records:
raise exceptions.BadRequest('Managed records may not be updated')
record, zone = self._update_record_in_storage(
context, zone, record, increment_serial=increment_serial)
self.zone_api.update_zone(context, zone)
return record
@transaction
def _update_record_in_storage(self, context, zone, record,
increment_serial=True):
if increment_serial:
# update the zone's status and increment the serial
zone = self._update_zone_in_storage(
context, zone, increment_serial)
record.action = 'UPDATE'
record.status = 'PENDING'
record.serial = zone.serial
# Update the record
record = self.storage.update_record(context, record)
return record, zone
@rpc.expected_exceptions()
@notification('dns.record.delete')
@synchronized_zone()
def delete_record(self, context, zone_id, recordset_id, record_id,
increment_serial=True):
zone = self.storage.get_zone(context, zone_id)
# Don't allow updates to zones that are being deleted
if zone.action == 'DELETE':
raise exceptions.BadRequest('Can not update a deleting zone')
recordset = self.storage.get_recordset(context, recordset_id)
record = self.storage.get_record(context, record_id)
# Ensure the zone_id matches the record's zone_id
if zone.id != record.zone_id:
raise exceptions.RecordNotFound()
# Ensure the recordset_id matches the record's recordset_id
if recordset.id != record.recordset_id:
raise exceptions.RecordNotFound()
if policy.enforce_new_defaults():
target = {
'zone_id': zone_id,
'zone_name': zone.name,
'zone_type': zone.type,
'recordset_id': recordset_id,
'recordset_name': recordset.name,
'record_id': record.id,
constants.RBAC_PROJECT_ID: zone.tenant_id
}
else:
target = {
'zone_id': zone_id,
'zone_name': zone.name,
'zone_type': zone.type,
'recordset_id': recordset_id,
'recordset_name': recordset.name,
'record_id': record.id,
'tenant_id': zone.tenant_id
}
policy.check('delete_record', context, target)
if recordset.managed and not context.edit_managed_records:
raise exceptions.BadRequest('Managed records may not be deleted')
record, zone = self._delete_record_in_storage(
context, zone, record, increment_serial=increment_serial)
self.zone_api.update_zone(context, zone)
return record
@transaction
def _delete_record_in_storage(self, context, zone, record,
increment_serial=True):
if increment_serial:
# update the zone's status and increment the serial
zone = self._update_zone_in_storage(
context, zone, increment_serial)
record.action = 'DELETE'
record.status = 'PENDING'
record.serial = zone.serial
record = self.storage.update_record(context, record)
return record, zone
@rpc.expected_exceptions()
def count_records(self, context, criterion=None):
if criterion is None:
@ -1994,101 +1745,6 @@ class Service(service.RPCService):
policy.check('count_records', context, target)
return self.storage.count_records(context, criterion)
# Diagnostics Methods
def _sync_zone(self, context, zone):
return self.zone_api.update_zone(context, zone)
@rpc.expected_exceptions()
@transaction
def sync_zones(self, context):
policy.check('diagnostics_sync_zones', context)
zones = self.storage.find_zones(context)
results = {}
for zone in zones:
results[zone.id] = self._sync_zone(context, zone)
return results
@rpc.expected_exceptions()
@transaction
def sync_zone(self, context, zone_id):
zone = self.storage.get_zone(context, zone_id)
if policy.enforce_new_defaults():
target = {
'zone_id': zone_id,
'zone_name': zone.name,
constants.RBAC_PROJECT_ID: zone.tenant_id
}
else:
target = {
'zone_id': zone_id,
'zone_name': zone.name,
'tenant_id': zone.tenant_id
}
policy.check('diagnostics_sync_zone', context, target)
return self._sync_zone(context, zone)
@rpc.expected_exceptions()
@transaction
def sync_record(self, context, zone_id, recordset_id, record_id):
zone = self.storage.get_zone(context, zone_id)
recordset = self.storage.get_recordset(context, recordset_id)
if policy.enforce_new_defaults():
target = {
'zone_id': zone_id,
'zone_name': zone.name,
'recordset_id': recordset_id,
'recordset_name': recordset.name,
'record_id': record_id,
constants.RBAC_PROJECT_ID: zone.tenant_id
}
else:
target = {
'zone_id': zone_id,
'zone_name': zone.name,
'recordset_id': recordset_id,
'recordset_name': recordset.name,
'record_id': record_id,
'tenant_id': zone.tenant_id
}
policy.check('diagnostics_sync_record', context, target)
self.zone_api.update_zone(context, zone)
@rpc.expected_exceptions()
def ping(self, context):
policy.check('diagnostics_ping', context)
# TODO(Ron): Handle this method properly.
try:
backend_status = {'status': None}
except Exception as e:
backend_status = {'status': False, 'message': str(e)}
try:
storage_status = self.storage.ping(context)
except Exception as e:
storage_status = {'status': False, 'message': str(e)}
if backend_status and storage_status:
status = True
else:
status = False
return {
'host': cfg.CONF.host,
'status': status,
'backend': backend_status,
'storage': storage_status
}
def _determine_floatingips(self, context, fips, project_id=None):
"""
Given the context or project, and fips it returns the valid

View File

@ -20,7 +20,6 @@ import itertools
from designate.common.policies import base
from designate.common.policies import blacklist
from designate.common.policies import context
from designate.common.policies import diagnostics
from designate.common.policies import pool
from designate.common.policies import quota
from designate.common.policies import record
@ -41,7 +40,6 @@ def list_rules():
base.list_rules(),
blacklist.list_rules(),
context.list_rules(),
diagnostics.list_rules(),
pool.list_rules(),
quota.list_rules(),
record.list_rules(),

View File

@ -1,75 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import versionutils
from oslo_policy import policy
from designate.common.policies import base
deprecated_diagnostics_ping = policy.DeprecatedRule(
name="diagnostics_ping",
check_str=base.RULE_ADMIN,
deprecated_reason=base.DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY
)
deprecated_diagnostics_sync_zones = policy.DeprecatedRule(
name="diagnostics_sync_zones",
check_str=base.RULE_ADMIN,
deprecated_reason=base.DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY
)
deprecated_diagnostics_sync_zone = policy.DeprecatedRule(
name="diagnostics_sync_zone",
check_str=base.RULE_ADMIN,
deprecated_reason=base.DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY
)
deprecated_diagnostics_sync_record = policy.DeprecatedRule(
name="diagnostics_sync_record",
check_str=base.RULE_ADMIN,
deprecated_reason=base.DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY
)
rules = [
policy.RuleDefault(
name="diagnostics_ping",
check_str=base.SYSTEM_ADMIN,
scope_types=['system'],
description='Diagnose ping.',
deprecated_rule=deprecated_diagnostics_ping),
policy.RuleDefault(
name="diagnostics_sync_zones",
check_str=base.SYSTEM_ADMIN,
scope_types=['system'],
description='Diagnose sync zones.',
deprecated_rule=deprecated_diagnostics_sync_zones),
policy.RuleDefault(
name="diagnostics_sync_zone",
check_str=base.SYSTEM_ADMIN,
scope_types=['system'],
description='Diagnose sync zone.',
deprecated_rule=deprecated_diagnostics_sync_zone),
policy.RuleDefault(
name="diagnostics_sync_record",
check_str=base.SYSTEM_ADMIN,
scope_types=['system'],
description='Diagnose sync record.',
deprecated_rule=deprecated_diagnostics_sync_record)
]
def list_rules():
return rules

View File

@ -100,12 +100,6 @@ deprecated_purge_zones = policy.DeprecatedRule(
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY
)
deprecated_touch_zone = policy.DeprecatedRule(
name="touch_zone",
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.WALLABY
)
rules = [
@ -243,12 +237,6 @@ rules = [
scope_types=['system'],
deprecated_rule=deprecated_purge_zones
),
policy.RuleDefault(
name="touch_zone",
check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
scope_types=['system', 'project'],
deprecated_rule=deprecated_touch_zone
)
]

View File

@ -758,12 +758,6 @@ class Storage(DriverPlugin, metaclass=abc.ABCMeta):
:param zone_export_id: Delete a Zone Export via ID
"""
def ping(self, context):
"""Ping the Storage connection"""
return {
'status': None
}
@abc.abstractmethod
def find_service_statuses(self, context, criterion=None, marker=None,
limit=None, sort_key=None, sort_dir=None):

View File

@ -13,8 +13,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_log import log as logging
from oslo_utils.secretutils import md5
from sqlalchemy import select, distinct, func
@ -1785,22 +1783,6 @@ class SQLAlchemyStorage(sqlalchemy_base.SQLAlchemy, storage_base.Storage):
exceptions.DuplicateServiceStatus,
exceptions.ServiceStatusNotFound)
# diagnostics
def ping(self, context):
start_time = time.time()
try:
result = self.engine.execute('SELECT 1').first()
except Exception:
status = False
else:
status = True if result[0] == 1 else False
return {
'status': status,
'rtt': "%f" % (time.time() - start_time)
}
# Reverse Name utils
def _rname_check(self, criterion):
# If the criterion has 'name' in it, switch it out for reverse_name

View File

@ -512,11 +512,11 @@ class TestCase(base.BaseTestCase):
return _values
def get_recordset_fixture(self, zone_name, type='A', fixture=0,
def get_recordset_fixture(self, zone_name, recordset_type='A', fixture=0,
values=None):
values = values or {}
_values = copy.copy(self.recordset_fixtures[type][fixture])
_values = copy.copy(self.recordset_fixtures[recordset_type][fixture])
_values.update(values)
try:
@ -693,31 +693,30 @@ class TestCase(base.BaseTestCase):
return self.central_service.create_zone(
context, objects.Zone.from_dict(values))
def create_recordset(self, zone, type='A', increment_serial=True,
**kwargs):
def create_recordset(self, zone, recordset_type='A', records=None,
increment_serial=True, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_recordset_fixture(zone['name'], type=type,
fixture=fixture,
values=kwargs)
values = self.get_recordset_fixture(
zone['name'],
recordset_type=recordset_type, fixture=fixture, values=kwargs
)
recordset = objects.RecordSet.from_dict(values)
if records is None:
recordset.records = [
objects.Record.from_dict(
self.get_record_fixture(recordset_type=recordset_type)
)
]
else:
recordset.records = records
return self.central_service.create_recordset(
context, zone['id'], objects.RecordSet.from_dict(values),
increment_serial=increment_serial)
def create_record(self, zone, recordset, increment_serial=True,
**kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_record_fixture(recordset['type'], fixture=fixture,
values=kwargs)
return self.central_service.create_record(
context, zone['id'], recordset['id'],
objects.Record.from_dict(values),
increment_serial=increment_serial)
context, zone['id'], recordset,
increment_serial=increment_serial
)
def create_blacklist(self, **kwargs):
context = kwargs.pop('context', self.admin_context)

View File

@ -20,6 +20,7 @@ import oslo_messaging as messaging
from designate.central import service as central_service
from designate import exceptions
from designate import objects
from designate.tests.test_api.test_v2 import ApiV2TestCase
LOG = logging.getLogger(__name__)
@ -370,7 +371,7 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
def test_get_deleted_recordsets(self):
zone = self.create_zone(fixture=1)
recordset = self.create_recordset(zone)
recordset = self.create_recordset(zone, records=[])
url = '/zones/%s/recordsets' % zone['id']
response = self.client.get(url)
@ -420,7 +421,7 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
def test_get_recordset(self):
# Create a recordset
recordset = self.create_recordset(self.zone)
recordset = self.create_recordset(self.zone, records=[])
url = '/zones/%s/recordsets/%s' % (self.zone['id'], recordset['id'])
response = self.client.get(url)
@ -466,7 +467,7 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
def test_update_recordset(self):
# Create a recordset
recordset = self.create_recordset(self.zone)
recordset = self.create_recordset(self.zone, records=[])
# Prepare an update body
body = {'description': 'Tester'}
@ -503,7 +504,7 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
def test_update_recordset_with_record_create(self):
# Create a recordset
recordset = self.create_recordset(self.zone, 'A')
recordset = self.create_recordset(self.zone, 'A', records=[])
# The action and status are NONE and ACTIVE as there are no records
self.assertEqual('NONE', recordset['action'])
@ -543,7 +544,6 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
def test_update_recordset_with_record_replace(self):
# Create a recordset with one record
recordset = self.create_recordset(self.zone, 'A')
self.create_record(self.zone, recordset)
# Prepare an update body
body = {'description': 'Tester',
@ -577,7 +577,6 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
# See bug #1474012
new_zone = self.create_zone(name='example.net.')
recordset = self.create_recordset(new_zone, 'TXT')
self.create_record(new_zone, recordset)
body = {'description': 'Tester', 'records': ['a' * 255]}
url = '/zones/%s/recordsets/%s' % (recordset['zone_id'],
@ -588,7 +587,6 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
# See bug #1474012
new_zone = self.create_zone(name='example.net.')
recordset = self.create_recordset(new_zone, 'TXT')
self.create_record(new_zone, recordset)
body = {'description': 'Tester', 'records': ['a' * 512]}
url = '/zones/%s/recordsets/%s' % (recordset['zone_id'],
recordset['id'])
@ -599,7 +597,6 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
# create TXT record with string split in 2
new_zone = self.create_zone(name='example.net.')
recordset = self.create_recordset(new_zone, 'TXT')
self.create_record(new_zone, recordset)
record = '"{}" "{}"'.format('a' * 250, 'a' * 250)
body = {'description': 'Tester', 'records': [record]}
url = '/zones/%s/recordsets/%s' % (recordset['zone_id'],
@ -609,7 +606,6 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
def test_update_recordset_with_record_clear(self):
# Create a recordset with one record
recordset = self.create_recordset(self.zone, 'A')
self.create_record(self.zone, recordset)
# Prepare an update body
body = {'description': 'Tester', 'records': []}
@ -745,7 +741,7 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
self.client.put_json, url, body)
def test_delete_recordset(self):
recordset = self.create_recordset(self.zone)
recordset = self.create_recordset(self.zone, records=[])
url = '/zones/%s/recordsets/%s' % (recordset['zone_id'],
recordset['id'])
@ -770,7 +766,6 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
def test_delete_recordset_with_records(self):
# Create a recordset with one record
recordset = self.create_recordset(self.zone, 'A')
self.create_record(self.zone, recordset)
url = '/zones/%s/recordsets/%s' % (recordset['zone_id'],
recordset['id'])
@ -854,11 +849,16 @@ class ApiV2RecordSetsTest(ApiV2TestCase):
# Test paging
new_zone = self.create_zone(name='example.net.')
recordset = self.create_recordset(new_zone, 'A')
self.create_record(new_zone, recordset, data='nyan')
recordset = self.create_recordset(new_zone, 'CNAME')
self.create_record(new_zone, recordset, data='nyan')
record_1 = objects.Record.from_dict({'data': 'nyan'})
self.create_recordset(
new_zone, 'A',
records=[record_1],
)
record_2 = objects.Record.from_dict({'data': 'nyan'})
self.create_recordset(
new_zone, 'CNAME',
records=[record_2],
)
# Even with paging enabled, total_count is still the total number of
# recordsets matching the "data" filter

View File

@ -1314,21 +1314,6 @@ class CentralServiceTest(CentralTestCase):
else:
raise Exception("Unexpected zone %r" % z)
def test_touch_zone(self):
# Create a zone
expected_zone = self.create_zone()
# Touch the zone
self.central_service.touch_zone(
self.admin_context, expected_zone['id'])
# Fetch the zone again
zone = self.central_service.get_zone(
self.admin_context, expected_zone['id'])
# Ensure the serial was incremented
self.assertGreater(zone['serial'], expected_zone['serial'])
def test_xfr_zone(self):
# Create a zone
fixture = self.get_zone_fixture('SECONDARY', 0)
@ -1569,10 +1554,13 @@ class CentralServiceTest(CentralTestCase):
def test_get_recordset_with_records(self):
zone = self.create_zone()
records = [
objects.Record.from_dict(self.get_record_fixture('A', fixture=0)),
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
]
# Create a recordset and two records
recordset = self.create_recordset(zone)
self.create_record(zone, recordset)
self.create_record(zone, recordset, fixture=1)
recordset = self.create_recordset(zone, records=records)
# Retrieve it, and ensure it's the same
recordset = self.central_service.get_recordset(
@ -1649,9 +1637,12 @@ class CentralServiceTest(CentralTestCase):
zone = self.create_zone()
# Create a recordset
recordset = self.create_recordset(zone)
self.create_record(zone, recordset)
self.create_record(zone, recordset, fixture=1)
records = [
objects.Record.from_dict(self.get_record_fixture('A', fixture=0)),
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
]
recordset = self.create_recordset(zone, records=records)
# Retrieve it, and ensure it's the same
criterion = {'zone_id': zone.id, 'name': recordset.name}
@ -1729,7 +1720,7 @@ class CentralServiceTest(CentralTestCase):
zone = self.create_zone()
# Create a recordset
recordset = self.create_recordset(zone)
recordset = self.create_recordset(zone, records=[])
# Append two new Records
recordset.records.append(objects.Record(data='192.0.2.1'))
@ -1753,13 +1744,12 @@ class CentralServiceTest(CentralTestCase):
original_serial = zone.serial
# Create a recordset and two records
recordset = self.create_recordset(zone)
self.create_record(zone, recordset)
self.create_record(zone, recordset, fixture=1)
records = [
objects.Record.from_dict(self.get_record_fixture('A', fixture=0)),
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
]
# Append two new Records
recordset.records.append(objects.Record(data='192.0.2.1'))
recordset.records.append(objects.Record(data='192.0.2.2'))
recordset = self.create_recordset(zone, records=records)
# Remove one of the Records
recordset.records.pop(0)
@ -1769,7 +1759,8 @@ class CentralServiceTest(CentralTestCase):
# Fetch the RecordSet again
recordset = self.central_service.get_recordset(
self.admin_context, zone.id, recordset.id)
self.admin_context, zone.id, recordset.id
)
# Fetch the Zone again
updated_zone = self.central_service.get_zone(self.admin_context,
@ -1786,9 +1777,11 @@ class CentralServiceTest(CentralTestCase):
zone = self.create_zone()
# Create a recordset and two records
recordset = self.create_recordset(zone, 'A')
self.create_record(zone, recordset)
self.create_record(zone, recordset, fixture=1)
records = [
objects.Record.from_dict(self.get_record_fixture('A', fixture=0)),
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
]
recordset = self.create_recordset(zone, records=records)
# Fetch the RecordSet again
recordset = self.central_service.get_recordset(
@ -1870,7 +1863,7 @@ class CentralServiceTest(CentralTestCase):
# 'SSHFP', 'SOA', 'NAPTR', 'CAA', 'CERT']
# Create a recordset
recordset = self.create_recordset(zone)
cname_recordset = self.create_recordset(zone, type='CNAME')
cname_recordset = self.create_recordset(zone, recordset_type='CNAME')
self.assertRaises(ovo_exc.ReadOnlyFieldError, setattr,
recordset, 'type', cname_recordset.type)
@ -1970,37 +1963,23 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(exceptions.Forbidden, exc.exc_info[0])
# Record Tests
def test_create_record(self):
zone = self.create_zone()
recordset = self.create_recordset(zone, type='A')
values = dict(
data='127.0.0.1'
)
# Create a record
record = self.central_service.create_record(
self.admin_context, zone['id'], recordset['id'],
objects.Record.from_dict(values))
# Ensure all values have been set correctly
self.assertIsNotNone(record['id'])
self.assertEqual(values['data'], record['data'])
self.assertIn('status', record)
def test_create_record_and_update_over_zone_quota(self):
# SOA and NS Records exist
self.config(quota_zone_records=0)
# Creating the zone automatically creates SOA & NS records
zone = self.create_zone()
recordset = self.create_recordset(zone)
recordset = self.create_recordset(zone, records=[])
self.create_record(zone, recordset)
recordset.records.append(
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
)
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.create_record,
zone, recordset)
exc = self.assertRaises(
rpc_dispatcher.ExpectedException,
self.central_service.update_recordset,
self.admin_context, recordset
)
self.assertEqual(exceptions.OverQuota, exc.exc_info[0])
@ -2031,42 +2010,22 @@ class CentralServiceTest(CentralTestCase):
# Creating the zone automatically creates SOA & NS records
zone = self.create_zone()
recordset = self.create_recordset(zone)
recordset = self.create_recordset(zone, records=[])
self.create_record(zone, recordset)
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.create_record,
zone, recordset)
self.assertEqual(exceptions.OverQuota, exc.exc_info[0])
def test_create_record_without_incrementing_serial(self):
zone = self.create_zone()
recordset = self.create_recordset(zone, type='A')
values = dict(
data='127.0.0.1'
recordset.records.append(
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
)
# Create a record
self.central_service.create_record(
self.admin_context, zone['id'], recordset['id'],
objects.Record.from_dict(values),
increment_serial=False)
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.central_service.update_recordset,
self.admin_context, recordset)
# Ensure the zones serial number was not updated
updated_zone = self.central_service.get_zone(
self.admin_context, zone['id'])
self.assertEqual(zone['serial'], updated_zone['serial'])
self.assertEqual(exceptions.OverQuota, exc.exc_info[0])
def test_get_record(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create a record
expected = self.create_record(zone, recordset)
expected = recordset.records[0]
# Retrieve it, and ensure it's the same
record = self.central_service.get_record(
@ -2080,9 +2039,7 @@ class CentralServiceTest(CentralTestCase):
zone = self.create_zone()
recordset = self.create_recordset(zone)
other_zone = self.create_zone(fixture=1)
# Create a record
expected = self.create_record(zone, recordset)
expected = recordset.records[0]
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.central_service.get_record,
@ -2096,9 +2053,7 @@ class CentralServiceTest(CentralTestCase):
zone = self.create_zone()
recordset = self.create_recordset(zone)
other_recordset = self.create_recordset(zone, fixture=1)
# Create a record
expected = self.create_record(zone, recordset)
expected = recordset.records[0]
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.central_service.get_record,
@ -2111,7 +2066,7 @@ class CentralServiceTest(CentralTestCase):
def test_find_records(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
recordset = self.create_recordset(zone, records=[])
criterion = {
'zone_id': zone['id'],
@ -2124,8 +2079,12 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(0, len(records))
# Create a single record (using default values)
expected_one = self.create_record(zone, recordset)
records = [
objects.Record.from_dict(self.get_record_fixture('A', fixture=0)),
]
recordset.records = records
expected_one = recordset.records[0]
self.central_service.update_recordset(self.admin_context, recordset)
# Ensure we can retrieve the newly created record
records = self.central_service.find_records(
@ -2135,22 +2094,21 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(expected_one['data'], records[0]['data'])
# Create a second record
expected_two = self.create_record(zone, recordset, fixture=1)
recordset.records.append(
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
)
self.central_service.update_recordset(self.admin_context, recordset)
# Ensure we can retrieve both records
records = self.central_service.find_records(
self.admin_context, criterion)
self.assertEqual(2, len(records))
self.assertEqual(expected_one['data'], records[0]['data'])
self.assertEqual(expected_two['data'], records[1]['data'])
def test_find_record(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create a record
expected = self.create_record(zone, recordset)
expected = recordset.records[0]
# Retrieve it, and ensure it's the same
criterion = {
@ -2166,200 +2124,6 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(expected['data'], record['data'])
self.assertIn('status', record)
def test_update_record(self):
zone = self.create_zone()
recordset = self.create_recordset(zone, 'A')
# Create a record
record = self.create_record(zone, recordset)
# Update the Object
record.data = '192.0.2.255'
# Perform the update
self.central_service.update_record(self.admin_context, record)
# Fetch the resource again
record = self.central_service.get_record(
self.admin_context, record.zone_id, record.recordset_id,
record.id)
# Ensure the new value took
self.assertEqual('192.0.2.255', record.data)
def test_update_record_without_incrementing_serial(self):
zone = self.create_zone()
recordset = self.create_recordset(zone, 'A')
# Create a record
record = self.create_record(zone, recordset)
# Fetch the zone so we have the latest serial number
zone_before = self.central_service.get_zone(
self.admin_context, zone.id)
# Update the Object
record.data = '192.0.2.255'
# Perform the update
self.central_service.update_record(
self.admin_context, record, increment_serial=False)
# Fetch the resource again
record = self.central_service.get_record(
self.admin_context, record.zone_id, record.recordset_id,
record.id)
# Ensure the new value took
self.assertEqual('192.0.2.255', record.data)
# Ensure the zones serial number was not updated
zone_after = self.central_service.get_zone(
self.admin_context, zone.id)
self.assertEqual(zone_before.serial, zone_after.serial)
def test_update_record_immutable_zone_id(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
other_zone = self.create_zone(fixture=1)
# Create a record
record = self.create_record(zone, recordset)
# Update the record
record.zone_id = other_zone.id
# Ensure we get a BadRequest if we change the zone_id
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.central_service.update_record,
self.admin_context, record)
self.assertEqual(exceptions.BadRequest, exc.exc_info[0])
def test_update_record_immutable_recordset_id(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
other_recordset = self.create_recordset(zone, fixture=1)
# Create a record
record = self.create_record(zone, recordset)
# Update the record
record.recordset_id = other_recordset.id
# Ensure we get a BadRequest if we change the recordset_id
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.central_service.update_record,
self.admin_context, record)
self.assertEqual(exceptions.BadRequest, exc.exc_info[0])
def test_delete_record(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create a record
record = self.create_record(zone, recordset)
# Fetch the zone serial number
zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
# Delete the record
self.central_service.delete_record(
self.admin_context, zone['id'], recordset['id'], record['id'])
# Ensure the zone serial number was updated
new_zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
self.assertNotEqual(new_zone_serial, zone_serial)
# Fetch the record
deleted_record = self.central_service.get_record(
self.admin_context, zone['id'], recordset['id'],
record['id'])
# Ensure the record is marked for deletion
self.assertEqual(record.id, deleted_record.id)
self.assertEqual(record.data, deleted_record.data)
self.assertEqual(record.zone_id, deleted_record.zone_id)
self.assertEqual('PENDING', deleted_record.status)
self.assertEqual(record.tenant_id, deleted_record.tenant_id)
self.assertEqual(record.recordset_id, deleted_record.recordset_id)
self.assertEqual('DELETE', deleted_record.action)
self.assertEqual(new_zone_serial, deleted_record.serial)
def test_delete_record_without_incrementing_serial(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create a record
record = self.create_record(zone, recordset)
# Fetch the zone serial number
zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
# Delete the record
self.central_service.delete_record(
self.admin_context, zone['id'], recordset['id'], record['id'],
increment_serial=False)
# Ensure the zones serial number was not updated
new_zone_serial = self.central_service.get_zone(
self.admin_context, zone['id'])['serial']
self.assertEqual(zone_serial, new_zone_serial)
# Fetch the record
deleted_record = self.central_service.get_record(
self.admin_context, zone['id'], recordset['id'],
record['id'])
# Ensure the record is marked for deletion
self.assertEqual(record.id, deleted_record.id)
self.assertEqual(record.data, deleted_record.data)
self.assertEqual(record.zone_id, deleted_record.zone_id)
self.assertEqual('PENDING', deleted_record.status)
self.assertEqual(record.tenant_id, deleted_record.tenant_id)
self.assertEqual(record.recordset_id, deleted_record.recordset_id)
self.assertEqual('DELETE', deleted_record.action)
self.assertEqual(new_zone_serial, deleted_record.serial)
def test_delete_record_incorrect_zone_id(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
other_zone = self.create_zone(fixture=1)
# Create a record
record = self.create_record(zone, recordset)
# Ensure we get a 404 if we use the incorrect zone_id
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.central_service.delete_record,
self.admin_context, other_zone['id'],
recordset['id'],
record['id'])
self.assertEqual(exceptions.RecordNotFound, exc.exc_info[0])
def test_delete_record_incorrect_recordset_id(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
other_recordset = self.create_recordset(zone, fixture=1)
# Create a record
record = self.create_record(zone, recordset)
# Ensure we get a 404 if we use the incorrect recordset_id
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.central_service.delete_record,
self.admin_context, zone['id'],
other_recordset['id'],
record['id'])
self.assertEqual(exceptions.RecordNotFound, exc.exc_info[0])
def test_count_records(self):
# in the beginning, there should be nothing
records = self.central_service.count_records(self.admin_context)
@ -2367,10 +2131,7 @@ class CentralServiceTest(CentralTestCase):
# Create a zone and recordset to put our record in
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create a record
self.create_record(zone, recordset)
self.create_recordset(zone)
# we should have 1 record now, plus SOA & NS records
records = self.central_service.count_records(self.admin_context)
@ -3140,13 +2901,11 @@ class CentralServiceTest(CentralTestCase):
def test_update_status_delete_last_record(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create a record
record = self.create_record(zone, recordset)
record = recordset.records[0]
# Delete the record
self.central_service.delete_record(
self.admin_context, zone['id'], recordset['id'], record['id'])
recordset.records = []
self.central_service.update_recordset(self.admin_context, recordset)
# Simulate the record having been deleted on the backend
zone_serial = self.central_service.get_zone(
@ -3161,7 +2920,7 @@ class CentralServiceTest(CentralTestCase):
recordset['id'],
record['id'])
self.assertEqual(exceptions.RecordSetNotFound, exc.exc_info[0])
self.assertEqual(exceptions.RecordNotFound, exc.exc_info[0])
def test_update_status_create_zone(self):
zone = self.create_zone()
@ -3323,43 +3082,6 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual('example.com.', notified_zone.name)
self.assertEqual('ACTIVE', notified_zone.status)
def test_update_status_delete_last_record_without_incrementing_serial(
self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create a record
record = self.create_record(zone, recordset)
# Fetch the zone serial number
zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
# Delete the record
self.central_service.delete_record(
self.admin_context, zone['id'], recordset['id'], record['id'],
increment_serial=False)
# Simulate the record having been deleted on the backend
zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
self.central_service.update_status(
self.admin_context, zone['id'], 'SUCCESS', zone_serial, 'UPDATE')
# Fetch the record again, ensuring an exception is raised
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.central_service.get_record,
self.admin_context, zone['id'],
recordset['id'], record['id'])
self.assertEqual(exceptions.RecordSetNotFound, exc.exc_info[0])
# Ensure the zones serial number was not updated
new_zone_serial = self.central_service.get_zone(
self.admin_context, zone['id']).serial
self.assertEqual(zone_serial, new_zone_serial)
def test_create_new_service_status_entry(self):
values = self.get_service_status_fixture()
@ -3508,8 +3230,7 @@ class CentralServiceTest(CentralTestCase):
zone = self.create_zone(context=tenant_1_context)
recordset = self.create_recordset(zone, context=tenant_1_context)
record = self.create_record(
zone, recordset, context=tenant_1_context)
record = recordset.records[0]
zone_transfer_request = self.create_zone_transfer_request(
zone, context=tenant_1_context)
@ -3559,8 +3280,7 @@ class CentralServiceTest(CentralTestCase):
zone = self.create_zone(context=tenant_1_context)
recordset = self.create_recordset(zone, context=tenant_1_context)
record = self.create_record(
zone, recordset, context=tenant_1_context)
record = recordset.records[0]
zone_transfer_request = self.create_zone_transfer_request(
zone,

View File

@ -403,8 +403,7 @@ class MdnsRequestHandlerTest(MdnsTestCase):
# This creates an A record for mail.example.com
zone = self.create_zone()
recordset = self.create_recordset(zone, 'A')
self.create_record(zone, recordset)
self.create_recordset(zone, 'A')
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
@ -434,8 +433,7 @@ class MdnsRequestHandlerTest(MdnsTestCase):
# This creates an TXT record for mail.example.com
zone = self.create_zone()
recordset = self.create_recordset(zone, 'TXT')
self.create_record(zone, recordset)
self.create_recordset(zone, 'TXT')
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
@ -462,7 +460,8 @@ class MdnsRequestHandlerTest(MdnsTestCase):
# ;ADDITIONAL
zone = self.create_zone()
recordset = self.create_recordset(zone, type='TXT')
recordset = self.create_recordset(zone, recordset_type='TXT',
records=[])
values = {'data': '"foo" "bar" "blah"'}
self.storage.create_record(
self.admin_context, zone['id'], recordset['id'],
@ -495,8 +494,7 @@ class MdnsRequestHandlerTest(MdnsTestCase):
# This creates an MX record for mail.example.com
zone = self.create_zone()
recordset = self.create_recordset(zone, 'MX')
self.create_record(zone, recordset)
self.create_recordset(zone, 'MX')
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
@ -901,8 +899,7 @@ class MdnsRequestHandlerTest(MdnsTestCase):
# This creates an MX record for mail.example.com
# But we query for a CNAME record
zone = self.create_zone()
recordset = self.create_recordset(zone, 'MX')
self.create_record(zone, recordset)
self.create_recordset(zone, 'MX')
request = dns.message.from_wire(binascii.a2b_hex(payload))
request.environ = {'addr': self.addr, 'context': self.context}
@ -936,10 +933,9 @@ class MdnsRequestHandlerTest(MdnsTestCase):
def test_dispatch_opcode_query_tsig_scope_pool(self):
# Create a zone/recordset/record to query
zone = self.create_zone(name='example.com.')
recordset = self.create_recordset(
zone, name='example.com.', type='A')
self.create_record(
zone, recordset, data='192.0.2.5')
self.create_recordset(
zone, name='example.com.', recordset_type='A'
)
# DNS packet with QUERY opcode for A example.com.
payload = ("c28901200001000000000001076578616d706c6503636f6d0000010001"
@ -965,9 +961,10 @@ class MdnsRequestHandlerTest(MdnsTestCase):
# example.com. 3600 IN A 192.0.2.5
# ;AUTHORITY
# ;ADDITIONAL
expected_response = (b"c28985000001000100000001076578616d706c6503636f"
b"6d0000010001c00c0001000100000e100004c000020500"
b"00292000000000000000")
expected_response = (
b'c28985000001000100000001076578616d706c6503636f6d0000010001c00c00'
b'01000100000e100004c00002010000292000000000000000'
)
response = next(self.handler(request)).to_wire()
@ -996,10 +993,12 @@ class MdnsRequestHandlerTest(MdnsTestCase):
def test_dispatch_opcode_query_tsig_scope_zone(self):
# Create a zone/recordset/record to query
zone = self.create_zone(name='example.com.')
recordset = self.create_recordset(
zone, name='example.com.', type='A')
self.create_record(
zone, recordset, data='192.0.2.5')
records = [
objects.Record.from_dict({'data': '192.0.2.5'}),
]
self.create_recordset(
zone, name='example.com.', recordset_type='A', records=records
)
# Create a TSIG Key Matching the zone
tsigkey_zone_known = self.create_tsigkey(

View File

@ -231,7 +231,7 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin):
with mock.patch.object(
self.plugin, '_create_or_update_recordset') as finder:
with mock.patch.object(self.plugin.central_api,
'create_record'):
'create_recordset'):
finder.return_value = {'id': 'fakeid'}
self.plugin.process_notification(
self.admin_context.to_dict(),
@ -248,7 +248,7 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin):
with mock.patch.object(
self.plugin, '_create_or_update_recordset') as finder:
with mock.patch.object(self.plugin.central_api,
'create_record'):
'create_recordset'):
finder.return_value = {'id': 'fakeid'}
self.plugin.process_notification(
self.admin_context.to_dict(),
@ -265,7 +265,7 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin):
with mock.patch.object(
self.plugin, '_create_or_update_recordset') as finder:
with mock.patch.object(self.plugin.central_api,
'create_record'):
'create_recordset'):
finder.return_value = {'id': 'fakeid'}
self.plugin.process_notification(
self.admin_context.to_dict(),

View File

@ -1003,9 +1003,9 @@ class StorageTestCase(object):
zone = self.create_zone()
records = [
{"data": "10.0.0.1"},
{"data": "10.0.0.2"},
{"data": "10.0.0.3"}
objects.Record.from_dict({"data": "10.0.0.1"}),
objects.Record.from_dict({"data": "10.0.0.2"}),
objects.Record.from_dict({"data": "10.0.0.3"})
]
recordset = self.create_recordset(zone, records=records)
@ -1045,11 +1045,12 @@ class StorageTestCase(object):
def test_get_recordset_with_records(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create two Records in the RecordSet
self.create_record(zone, recordset)
self.create_record(zone, recordset, fixture=1)
records = [
objects.Record.from_dict(self.get_record_fixture('A', fixture=0)),
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
]
recordset = self.create_recordset(zone, records=records)
# Fetch the RecordSet again
recordset = self.storage.get_recordset(
@ -1095,11 +1096,12 @@ class StorageTestCase(object):
def test_find_recordset_criterion_with_records(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create two Records in the RecordSet
self.create_record(zone, recordset)
self.create_record(zone, recordset, fixture=1)
records = [
objects.Record.from_dict(self.get_record_fixture('A', fixture=0)),
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
]
recordset = self.create_recordset(zone, records=records)
criterion = dict(
id=recordset.id,
@ -1162,7 +1164,7 @@ class StorageTestCase(object):
zone = self.create_zone()
# Create a RecordSet
recordset = self.create_recordset(zone, 'A')
recordset = self.create_recordset(zone, 'A', records=[])
# Append two new Records
recordset.records.append(objects.Record(data='192.0.2.1'))
@ -1188,9 +1190,11 @@ class StorageTestCase(object):
zone = self.create_zone()
# Create a RecordSet and two Records
recordset = self.create_recordset(zone, 'A')
self.create_record(zone, recordset)
self.create_record(zone, recordset, fixture=1)
records = [
objects.Record.from_dict(self.get_record_fixture('A', fixture=0)),
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
]
recordset = self.create_recordset(zone, records=records)
# Fetch the RecordSet again
recordset = self.storage.get_recordset(
@ -1217,9 +1221,11 @@ class StorageTestCase(object):
zone = self.create_zone()
# Create a RecordSet and two Records
recordset = self.create_recordset(zone, 'A')
self.create_record(zone, recordset)
self.create_record(zone, recordset, fixture=1)
records = [
objects.Record.from_dict(self.get_record_fixture('A', fixture=0)),
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
]
recordset = self.create_recordset(zone, records=records)
# Fetch the RecordSet again
recordset = self.storage.get_recordset(
@ -1288,43 +1294,9 @@ class StorageTestCase(object):
recordsets = self.storage.count_recordsets(self.admin_context)
self.assertEqual(0, recordsets)
def test_create_record(self):
zone = self.create_zone()
recordset = self.create_recordset(zone, type='A')
values = {
'data': '192.0.2.1',
}
result = self.storage.create_record(
self.admin_context, zone['id'], recordset['id'],
objects.Record.from_dict(values))
self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at'])
self.assertIsNotNone(result['hash'])
self.assertIsNone(result['updated_at'])
self.assertEqual(self.admin_context.project_id, result['tenant_id'])
self.assertEqual(values['data'], result['data'])
self.assertIn('status', result)
def test_create_record_duplicate(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create the First Record
self.create_record(zone, recordset)
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.create_record,
zone, recordset)
self.assertEqual(exceptions.DuplicateRecord, exc.exc_info[0])
def test_find_records(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
recordset = self.create_recordset(zone, records=[])
criterion = {
'zone_id': zone['id'],
@ -1335,21 +1307,33 @@ class StorageTestCase(object):
self.assertEqual(0, len(actual))
# Create a single record
record = self.create_record(zone, recordset)
records = [
objects.Record.from_dict(self.get_record_fixture('A', fixture=0)),
]
recordset.records = records
self.central_service.update_recordset(self.admin_context, recordset)
recordset = self.central_service.get_recordset(
self.admin_context, zone['id'], recordset['id']
)
record = recordset.records[0]
actual = self.storage.find_records(self.admin_context, criterion)
self.assertEqual(1, len(actual))
self.assertEqual(record['data'], actual[0]['data'])
self.assertIn('status', record)
def test_find_records_paging(self):
zone = self.create_zone()
recordset = self.create_recordset(zone, type='A')
# Create 10 Records
created = [self.create_record(zone, recordset, data='192.0.2.%d' % i)
for i in range(10)]
records = []
for i in range(10):
records.append(
objects.Record.from_dict(({'data': '192.0.2.%d' % i}))
)
self.create_recordset(zone, type='A', records=records)
# Add in the SOA and NS records that are automatically created
soa = self.storage.find_recordset(self.admin_context,
@ -1359,18 +1343,22 @@ class StorageTestCase(object):
criterion={'zone_id': zone['id'],
'type': "NS"})
for r in ns['records']:
created.insert(0, r)
created.insert(0, soa['records'][0])
records.insert(0, r)
records.insert(0, soa['records'][0])
# Ensure we can page through the results.
self._ensure_paging(created, self.storage.find_records)
self._ensure_paging(records, self.storage.find_records)
def test_find_records_criterion(self):
zone = self.create_zone()
recordset = self.create_recordset(zone, type='A')
record_one = self.create_record(zone, recordset)
self.create_record(zone, recordset, fixture=1)
record_one = objects.Record.from_dict(
self.get_record_fixture('A', fixture=0)
)
records = [
record_one,
objects.Record.from_dict(self.get_record_fixture('A', fixture=1))
]
recordset = self.create_recordset(zone, records=records)
criterion = dict(
data=record_one['data'],
@ -1392,11 +1380,9 @@ class StorageTestCase(object):
def test_find_records_criterion_wildcard(self):
zone = self.create_zone()
recordset = self.create_recordset(zone, type='A')
values = {'data': '127.0.0.1'}
self.create_record(zone, recordset, **values)
records = [objects.Record.from_dict({'data': '127.0.0.1'})]
recordset = self.create_recordset(zone, type='A', records=records)
criterion = dict(
zone_id=zone['id'],
@ -1408,52 +1394,10 @@ class StorageTestCase(object):
self.assertEqual(1, len(results))
def test_find_records_all_tenants(self):
# Create two contexts with different tenant_id's
one_context = self.get_admin_context()
one_context.project_id = 1
two_context = self.get_admin_context()
two_context.project_id = 2
# Create normal and all_tenants context objects
nm_context = self.get_admin_context()
at_context = self.get_admin_context()
at_context.all_tenants = True
# Create two zones in different tenants, and 1 record in each
zone_one = self.create_zone(fixture=0, context=one_context)
recordset_one = self.create_recordset(zone_one, fixture=0,
context=one_context)
self.create_record(zone_one, recordset_one, context=one_context)
zone_two = self.create_zone(fixture=1, context=two_context)
recordset_one = self.create_recordset(zone_two, fixture=1,
context=two_context)
self.create_record(zone_two, recordset_one, context=two_context)
# Ensure the all_tenants context see's two records
# Plus the SOA & NS in each of 2 zones = 6 records total
results = self.storage.find_records(at_context)
self.assertEqual(6, len(results))
# Ensure the normal context see's no records
results = self.storage.find_records(nm_context)
self.assertEqual(0, len(results))
# Ensure the tenant 1 context see's 1 record + SOA & NS
results = self.storage.find_records(one_context)
self.assertEqual(3, len(results))
# Ensure the tenant 2 context see's 1 record + SOA & NS
results = self.storage.find_records(two_context)
self.assertEqual(3, len(results))
def test_get_record(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
expected = self.create_record(zone, recordset)
expected = recordset.records[0]
actual = self.storage.get_record(self.admin_context, expected['id'])
@ -1468,8 +1412,7 @@ class StorageTestCase(object):
def test_find_record_criterion(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
expected = self.create_record(zone, recordset)
expected = recordset.records[0]
criterion = dict(
zone_id=zone['id'],
@ -1485,8 +1428,7 @@ class StorageTestCase(object):
def test_find_record_criterion_missing(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
expected = self.create_record(zone, recordset)
expected = recordset.records[0]
criterion = dict(
zone_id=zone['id'],
@ -1499,9 +1441,7 @@ class StorageTestCase(object):
def test_update_record(self):
zone = self.create_zone()
recordset = self.create_recordset(zone, type='A')
# Create a record
record = self.create_record(zone, recordset)
record = recordset.records[0]
# Update the Object
record.data = '192.0.2.255'
@ -1517,11 +1457,20 @@ class StorageTestCase(object):
def test_update_record_duplicate(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create two records
record_one = self.create_record(zone, recordset)
record_two = self.create_record(zone, recordset, fixture=1)
record_one = objects.Record.from_dict(
self.get_record_fixture('A', fixture=0)
)
record_two = objects.Record.from_dict(
self.get_record_fixture('A', fixture=1)
)
records = [
record_one,
record_two
]
self.create_recordset(zone, records=records)
# Update the R2 object to be a duplicate of R1
record_two.data = record_one.data
@ -1538,9 +1487,7 @@ class StorageTestCase(object):
def test_delete_record(self):
zone = self.create_zone()
recordset = self.create_recordset(zone)
# Create a record
record = self.create_record(zone, recordset)
record = recordset.records[0]
self.storage.delete_record(self.admin_context, record['id'])
@ -1559,8 +1506,7 @@ class StorageTestCase(object):
# Create a single zone & record
zone = self.create_zone()
recordset = self.create_recordset(zone)
self.create_record(zone, recordset)
self.create_recordset(zone)
# we should have 3 records now, including NS and SOA
records = self.storage.count_records(self.admin_context)
@ -1579,19 +1525,6 @@ class StorageTestCase(object):
records = self.storage.count_records(self.admin_context)
self.assertEqual(0, records)
def test_ping(self):
pong = self.storage.ping(self.admin_context)
self.assertTrue(pong['status'])
self.assertIsNotNone(pong['rtt'])
def test_ping_fail(self):
with mock.patch.object(self.storage.engine, "execute",
side_effect=Exception):
result = self.storage.ping(self.admin_context)
self.assertFalse(result['status'])
self.assertIsNotNone(result['rtt'])
# TLD Tests
def test_create_tld(self):
values = {
@ -3073,8 +3006,7 @@ class StorageTestCase(object):
zone = self.create_zone(context=tenant_1_context)
recordset = self.create_recordset(zone, context=tenant_1_context)
record = self.create_record(
zone, recordset, context=tenant_1_context)
record = recordset.records[0]
updated_zone = zone

View File

@ -13,8 +13,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_log import log as logging
from designate import storage
@ -30,14 +28,6 @@ class SqlalchemyStorageTest(StorageTestCase, TestCase):
self.storage = storage.get_storage('sqlalchemy')
def test_ping_negative(self):
with mock.patch.object(self.storage.engine, 'execute',
return_value=0):
pong = self.storage.ping(self.admin_context)
self.assertFalse(pong['status'])
self.assertIsNotNone(pong['rtt'])
def test_schema_table_names(self):
table_names = [
u'blacklists',

View File

@ -1106,22 +1106,6 @@ class CentralZoneTestCase(CentralBasic):
self.assertEqual(exceptions.ReportNotFound, exc.exc_info[0])
def test_touch_zone_with_worker_model(self):
self.service._touch_zone_in_storage = mock.Mock()
self.service.storage.get_zone.return_value = RoObject(
name='example.org.',
tenant_id='2',
)
with fx_worker:
self.service.touch_zone(self.context,
CentralZoneTestCase.zone__id)
self.assertTrue(designate.central.service.policy.check.called)
self.assertEqual(
'touch_zone',
designate.central.service.policy.check.call_args[0][0]
)
def test_get_recordset_not_found(self):
self.service.storage.get_zone.return_value = RoObject(
id=CentralZoneTestCase.zone__id,
@ -1516,82 +1500,6 @@ class CentralZoneTestCase(CentralBasic):
self.service.storage.count_recordsets.call_args[0][1]
)
def test_create_record_fail_on_delete(self):
self.service.storage.get_zone.return_value = RoObject(
action='DELETE',
id=CentralZoneTestCase.zone__id_2,
name='example.org.',
tenant_id='2',
type='foo',
)
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.service.create_record,
self.context,
CentralZoneTestCase.zone__id,
CentralZoneTestCase.recordset__id,
RoObject())
self.assertEqual(exceptions.BadRequest, exc.exc_info[0])
def _test_create_record(self):
self.service._create_record_in_storage = mock.Mock(
return_value=(None, None)
)
self.service.storage.get_zone.return_value = RoObject(
action='a',
id=CentralZoneTestCase.zone__id_2,
name='example.org.',
tenant_id='2',
type='foo',
)
self.service.storage.get_recordset.return_value = RoObject(
name='rs',
)
with fx_worker:
self.service.create_record(
self.context,
CentralZoneTestCase.zone__id,
CentralZoneTestCase.recordset__id,
RoObject())
self.assertTrue(
self.service.zone_api.update_zone.called)
n, ctx, target = designate.central.service.policy.check.call_args[0]
self.assertEqual('create_record', n)
self.assertEqual({
'zone_id': CentralZoneTestCase.zone__id,
'zone_name': 'example.org.',
'zone_type': 'foo',
'recordset_id': CentralZoneTestCase.recordset__id,
'recordset_name': 'rs',
'project_id': '2'}, target)
def test_create_record_worker(self):
self._test_create_record()
def test__create_record_in_storage(self):
self.service._enforce_record_quota = mock.Mock()
self.service._create_record_in_storage(
self.context,
RoObject(id=CentralZoneTestCase.zone__id, serial=4),
RoObject(id=CentralZoneTestCase.recordset__id),
RwObject(
action='',
status='',
serial='',
),
increment_serial=False
)
create_record = self.service.storage.create_record
ctx, did, rid, record = create_record.call_args[0]
self.assertEqual(CentralZoneTestCase.zone__id, did)
self.assertEqual(CentralZoneTestCase.recordset__id, rid)
self.assertEqual('CREATE', record.action)
self.assertEqual('PENDING', record.status)
self.assertEqual(4, record.serial)
def test_get_record_not_found(self):
self.service.storage.get_zone.return_value = RoObject(
id=CentralZoneTestCase.zone__id_2,
@ -1669,296 +1577,12 @@ class CentralZoneTestCase(CentralBasic):
'recordset_name': 'foo',
'project_id': 2}, target)
def test_update_record_fail_on_changes(self):
self.service.storage.get_zone.return_value = RoObject(
action='a',
name='n',
type='t',
tenant_id='tid',
)
record = mock.Mock()
record.obj_get_original_value.return_value = 1
record.obj_get_changes.return_value = ['tenant_id', 'foo']
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.service.update_record,
self.context, record)
self.assertEqual(exceptions.BadRequest, exc.exc_info[0])
record.obj_get_changes.return_value = ['zone_id', 'foo']
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.service.update_record,
self.context, record)
self.assertEqual(exceptions.BadRequest, exc.exc_info[0])
record.obj_get_changes.return_value = ['recordset_id', 'foo']
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.service.update_record,
self.context, record)
self.assertEqual(exceptions.BadRequest, exc.exc_info[0])
def test_update_record_action_delete(self):
self.service.storage.get_zone.return_value = RoObject(
action='DELETE',
)
record = mock.Mock()
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.service.update_record,
self.context, record)
self.assertEqual(exceptions.BadRequest, exc.exc_info[0])
def test_update_record_action_fail_on_managed(self):
self.service.storage.get_zone.return_value = RoObject(
action='a',
name='n',
tenant_id='tid',
type='t',
)
self.service.storage.get_recordset.return_value = RoObject(
name='rsn',
managed=True
)
record = mock.Mock()
record.obj_get_changes.return_value = ['foo']
self.context = mock.Mock()
self.context.edit_managed_records = False
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.service.update_record,
self.context, record)
self.assertEqual(exceptions.BadRequest, exc.exc_info[0])
def test_update_record_worker(self):
self.service.storage.get_zone.return_value = RoObject(
action='a',
name='n',
tenant_id='tid',
type='t',
)
self.service.storage.get_recordset.return_value = RoObject(
name='rsn',
managed=False
)
record = mock.Mock()
record.obj_get_changes.return_value = ['foo']
record.obj_get_original_value.return_value =\
'abc12a-1e9d-4e99-aede-a06664f1af2e'
record.managed = False
self.service._update_record_in_storage = mock.Mock(
return_value=('x', 'y')
)
with fx_worker:
self.service.update_record(self.context, record)
self.assertTrue(self.service._update_record_in_storage.called)
n, ctx, target = designate.central.service.policy.check.call_args[0]
self.assertEqual('update_record', n)
self.assertEqual({
'zone_id': 'abc12a-1e9d-4e99-aede-a06664f1af2e',
'zone_name': 'n',
'zone_type': 't',
'record_id': 'abc12a-1e9d-4e99-aede-a06664f1af2e',
'recordset_id': 'abc12a-1e9d-4e99-aede-a06664f1af2e',
'recordset_name': 'rsn',
'project_id': 'tid'}, target)
def test__update_record_in_storage(self):
self.service._update_zone_in_storage = mock.Mock()
self.service._update_record_in_storage(
self.context,
RoObject(serial=1),
RwObject(
action='',
status='',
serial='',
),
increment_serial=False
)
ctx, record = self.service.storage.update_record.call_args[0]
self.assertEqual('UPDATE', record.action)
self.assertEqual('PENDING', record.status)
self.assertEqual(1, record.serial)
def test_delete_record_action_delete(self):
self.service.storage.get_zone.return_value = RoObject(
action='DELETE',
)
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.service.delete_record,
self.context, 1, 2, 3)
self.assertEqual(exceptions.BadRequest, exc.exc_info[0])
def test_delete_record_not_found(self):
self.service.storage.get_zone.return_value = RoObject(
action='a',
id=CentralZoneTestCase.zone__id
)
self.service.storage.get_record.return_value = RoObject(
zone_id=CentralZoneTestCase.record__id_2,
)
self.service.storage.get_recordset.return_value = RoObject(
id=CentralZoneTestCase.recordset__id_2,
)
# zone.id != record.zone_id
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.service.delete_record,
self.context,
CentralZoneTestCase.zone__id,
CentralZoneTestCase.recordset__id,
CentralZoneTestCase.record__id)
self.assertEqual(exceptions.RecordNotFound, exc.exc_info[0])
self.service.storage.get_record.return_value = RoObject(
id=CentralZoneTestCase.record__id,
zone_id=CentralZoneTestCase.zone__id,
recordset_id=CentralZoneTestCase.recordset__id_3,
)
# recordset.id != record.recordset_id
exc = self.assertRaises(rpc_dispatcher.ExpectedException,
self.service.delete_record,
self.context,
CentralZoneTestCase.zone__id,
CentralZoneTestCase.recordset__id,
CentralZoneTestCase.record__id)
self.assertEqual(exceptions.RecordNotFound, exc.exc_info[0])
def test_delete_record_worker(self):
self.service._delete_record_in_storage = mock.Mock(
return_value=(None, None)
)
self.service.storage.get_zone.return_value = RoObject(
action='a',
id=CentralZoneTestCase.zone__id,
name='dn',
tenant_id='tid',
type='t',
)
self.service.storage.get_record.return_value = RoObject(
id=CentralZoneTestCase.record__id_2,
zone_id=CentralZoneTestCase.zone__id,
recordset_id=CentralZoneTestCase.recordset__id,
)
self.service.storage.get_recordset.return_value = RoObject(
name='rsn',
id=CentralZoneTestCase.recordset__id,
managed=False,
)
with fx_worker:
self.service.delete_record(self.context,
CentralZoneTestCase.zone__id_2,
CentralZoneTestCase.recordset__id_2,
CentralZoneTestCase.record__id)
t, ctx, target = designate.central.service.policy.check.call_args[0]
self.assertEqual('delete_record', t)
self.assertEqual({
'zone_id': CentralZoneTestCase.zone__id_2,
'zone_name': 'dn',
'zone_type': 't',
'record_id': CentralZoneTestCase.record__id_2,
'recordset_id': CentralZoneTestCase.recordset__id_2,
'recordset_name': 'rsn',
'project_id': 'tid'}, target)
def test_delete_record_in_storage(self):
self.service._delete_record_in_storage(
self.context,
RoObject(serial=2),
RwObject(action='', status='', serial=''),
increment_serial=False
)
r = self.service.storage.update_record.call_args[0][1]
self.assertEqual('DELETE', r.action)
self.assertEqual('PENDING', r.status)
self.assertEqual(2, r.serial)
def test_count_records(self):
self.service.count_records(self.context)
t, ctx, target = designate.central.service.policy.check.call_args[0]
self.assertEqual('count_records', t)
self.assertEqual({'project_id': None}, target)
def test_sync_zones(self):
self.service._sync_zone = mock.Mock()
self.service.storage.find_zones.return_value = [
RoObject(id=CentralZoneTestCase.zone__id),
RoObject(id=CentralZoneTestCase.zone__id_2)
]
res = self.service.sync_zones(self.context)
t, ctx = designate.central.service.policy.check.call_args[0]
self.assertEqual('diagnostics_sync_zones', t)
self.assertEqual(2, len(res))
def test_sync_zone(self):
self.service._sync_zone = mock.Mock()
self.service.storage.get_zone.return_value = RoObject(
id=CentralZoneTestCase.zone__id,
name='n',
tenant_id='tid',
)
self.service.sync_zone(self.context,
CentralZoneTestCase.zone__id)
t, ctx, target = designate.central.service.policy.check.call_args[0]
self.assertEqual('diagnostics_sync_zone', t)
self.assertEqual({'project_id': 'tid',
'zone_id': CentralZoneTestCase.zone__id,
'zone_name': 'n'}, target)
def test_sync_record(self):
self.service.storage.get_zone.return_value = RoObject(
id=CentralZoneTestCase.zone__id,
name='n',
tenant_id='tid',
)
self.service.storage.get_recordset.return_value = RoObject(
name='n',
)
self.service.sync_record(
self.context, CentralZoneTestCase.zone__id,
CentralZoneTestCase.recordset__id,
CentralZoneTestCase.record__id)
t, ctx, target = designate.central.service.policy.check.call_args[0]
self.assertEqual('diagnostics_sync_record', t)
self.assertEqual({
'zone_id': CentralZoneTestCase.zone__id,
'zone_name': 'n',
'record_id': CentralZoneTestCase.record__id,
'recordset_id': CentralZoneTestCase.recordset__id,
'recordset_name': 'n',
'project_id': 'tid'}, target)
def test_ping(self):
self.service.storage.ping.return_value = True
r = self.service.ping(self.context)
self.assertEqual({'status': None}, r['backend'])
self.assertTrue(r['status'])
self.assertTrue(r['storage'])
def test_ping2(self):
self.service.storage.ping.return_value = False
r = self.service.ping(self.context)
self.assertEqual({'status': None}, r['backend'])
self.assertFalse(r['status'])
self.assertFalse(r['storage'])
def test_determine_floatingips(self):
self.context = mock.Mock()
self.context.project_id = 'tnt'

View File

@ -0,0 +1,15 @@
---
upgrade:
- |
Removed the following unused central rpc calls. This should not impact
normal installations, but if these are used in any custom written backends
or plugins that you are using, you will need to update your code before
upgrading.
- ``create_record``
- ``update_record``
- ``delete_record``
- ``sync_record``
- ``sync_zone``
- ``sync_zones``
- ``touch_zone``