Ensure the zone records quota is enforced

When creating a recordset, we only assert the recordset specific quota
is respected, ignoring the zone record quota.

This updates the code to enforce the records quota as well as excludes
the managed records when calculating the quota. Also, the submitted
records are taken into account when testing the quota. For example, if
the total number is 10 and the limit is 11 (the total must be less
than the quota), submitting an update to a recordset to lower the
number of A records from 5 to 3 should be allowed.

Cherry-pick from review.openstack.org/284361/

Closes-Bug: #1548331
Change-Id: I26839eca1c4901c5b8dbe3db69044c4da889b7d8
This commit is contained in:
Eric Larson 2016-03-03 14:04:42 -06:00
parent 8de1f180c2
commit b479cb7a3c
3 changed files with 134 additions and 9 deletions

View File

@ -641,19 +641,37 @@ class Service(service.RPCService, service.Service):
context, domain.tenant_id, domain_recordsets=count)
def _enforce_record_quota(self, context, domain, recordset):
# Quotas don't apply to managed records.
if recordset.managed:
return
# Ensure the records per domain quota is OK
criterion = {'domain_id': domain.id}
count = self.storage.count_records(context, criterion)
domain_criterion = {
'domain_id': domain.id,
'managed': False, # only include non-managed records
}
domain_records = self.storage.count_records(context, domain_criterion)
recordset_criterion = {
'recordset_id': recordset.id,
'managed': False, # only include non-managed records
}
recordset_records = self.storage.count_records(
context, recordset_criterion)
# We need to check the current number of domains + the
# changes that add, so lets get +/- from our recordset
# records based on the action
adjusted_domain_records = (
domain_records - recordset_records + len(recordset.records))
self.quota.limit_check(context, domain.tenant_id,
domain_records=count)
domain_records=adjusted_domain_records)
# Ensure the records per recordset quota is OK
criterion = {'recordset_id': recordset.id}
count = self.storage.count_records(context, criterion)
self.quota.limit_check(context, domain.tenant_id,
recordset_records=count)
recordset_records=recordset_records)
# Misc Methods
def get_absolute_limits(self, context):
@ -1195,6 +1213,11 @@ class Service(service.RPCService, service.Service):
self._is_valid_recordset_records(recordset)
if recordset.obj_attr_is_set('records') and len(recordset.records) > 0:
# Ensure the tenant has enough zone record quotas to
# create new records
self._enforce_record_quota(context, domain, recordset)
if increment_serial:
# update the zone's status and increment the serial
domain = self._update_domain_in_storage(
@ -1326,6 +1349,10 @@ class Service(service.RPCService, service.Service):
record.status = 'PENDING'
record.serial = domain.serial
# Ensure the tenant has enough zone record quotas to
# create new records
self._enforce_record_quota(context, domain, recordset)
# Update the recordset
recordset = self.storage.update_recordset(context, recordset)

View File

@ -1620,9 +1620,9 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(record['data'], values['data'])
self.assertIn('status', record)
def test_create_record_over_domain_quota(self):
def test_create_record_and_update_over_zone_quota(self):
# SOA and NS Records exist
self.config(quota_domain_records=3)
self.config(quota_domain_records=1)
# Creating the domain automatically creates SOA & NS records
domain = self.create_domain()
@ -1633,6 +1633,26 @@ class CentralServiceTest(CentralTestCase):
with testtools.ExpectedException(exceptions.OverQuota):
self.create_record(domain, recordset)
def test_create_record_over_zone_quota(self):
self.config(quota_domain_records=1)
# Creating the domain automatically creates SOA & NS records
domain = self.create_domain()
recordset = objects.RecordSet(
name='www.%s' % domain.name,
type='A',
records=objects.RecordList(objects=[
objects.Record(data='192.3.3.15'),
objects.Record(data='192.3.3.16'),
])
)
with testtools.ExpectedException(exceptions.OverQuota):
# Persist the Object
recordset = self.central_service.create_recordset(
self.admin_context, domain.id, recordset=recordset)
def test_create_record_over_recordset_quota(self):
self.config(quota_recordset_records=1)

View File

@ -384,6 +384,30 @@ class CentralServiceTestCase(CentralBasic):
self.assertEqual(rs, 'rs')
self.assertFalse(self.service._update_domain_in_storage.called)
def test_create_recordset_with_records_in_storage(self):
self.service._enforce_recordset_quota = mock.Mock()
self.service._enforce_record_quota = mock.Mock()
self.service._is_valid_recordset_name = mock.Mock()
self.service._is_valid_recordset_placement = mock.Mock()
self.service._is_valid_recordset_placement_subzone = mock.Mock()
self.service._is_valid_ttl = mock.Mock()
self.service.storage.create_recordset = mock.Mock(return_value='rs')
self.service.storage.find_domains = mock.Mock(return_value=[])
self.service._update_domain_in_storage = mock.Mock()
recordset = Mock()
recordset.obj_attr_is_set.return_value = True
recordset.records = [MockRecord()]
rs, zone = self.service._create_recordset_in_storage(
self.context, MockDomain(), recordset
)
assert self.service._enforce_record_quota.called
assert self.service._update_domain_in_storage.called
def test__create_soa(self):
self.service._create_recordset_in_storage = Mock(
return_value=(None, None)
@ -1149,6 +1173,7 @@ class CentralDomainTestCase(CentralBasic):
self.service._is_valid_recordset_placement_subdomain = Mock()
self.service._is_valid_ttl = Mock()
self.service._update_domain_in_storage = Mock()
self.service._enforce_record_quota = mock.Mock()
self.service._update_recordset_in_storage(
self.context,
@ -1173,6 +1198,7 @@ class CentralDomainTestCase(CentralBasic):
assert not self.service._is_valid_ttl.called
assert not self.service._update_domain_in_storage.called
assert self.service.storage.update_recordset.called
assert self.service._enforce_record_quota.called
def test_delete_recordset_not_found(self):
self.service.storage.get_domain.return_value = RoObject(
@ -1882,3 +1908,55 @@ class CentralZoneExportTests(CentralBasic):
pcheck, ctx, target = \
designate.central.service.policy.check.call_args[0]
self.assertEqual(pcheck, 'delete_zone_export')
class CentralStatusTests(CentralBasic):
def test__update_domain_or_record_status_no_domain(self):
domain = RwObject(
action='UPDATE',
status='SUCCESS',
serial=0,
)
dom, deleted = self.service.\
_update_domain_or_record_status(domain, 'NO_DOMAIN', 0)
self.assertEqual(dom.action, 'CREATE')
self.assertEqual(dom.status, 'ERROR')
class CentralQuotaTest(unittest.TestCase):
def setUp(self):
self.context = mock.Mock()
self.domain = mock.Mock()
@patch('designate.central.service.storage')
@patch('designate.central.service.quota')
def test_domain_record_quota_allows_lowering_value(self, quota, storage):
service = Service()
service.storage.count_records.return_value = 10
recordset = mock.Mock()
recordset.managed = False
recordset.records = ['1.1.1.%i' % (i + 1) for i in range(5)]
service._enforce_record_quota(
self.context, self.domain, recordset
)
# Ensure we check against the number of records that will
# result in the API call. The 5 value is as if there were 10
# unmanaged records unders a single recordset. We find 10
# total - 10 for the recordset being passed in and add the 5
# from the new recordset.
check_domain_records = mock.call(
self.context, self.domain.tenant_id, domain_records=10 - 10 + 5
)
assert check_domain_records in service.quota.limit_check.mock_calls
# Check the recordset limit as well
check_recordset_records = mock.call(
self.context, self.domain.tenant_id, recordset_records=10
)
assert check_recordset_records in service.quota.limit_check.mock_calls