Fix order of argument in assertEqual

Some tests used incorrect argument order in
assertEqual(observed, expected).

The correct order expected by testtools is
assertEqual(expected, observed)

Change-Id: I73b331a1c75aa9e136c64ebaf363e7a2ef6a3ebc
Partial-Bug: #1259292
This commit is contained in:
sonu.kumar 2015-09-29 11:33:16 +05:30
parent edccfaa473
commit 1f01af47e0
1 changed files with 198 additions and 198 deletions

View File

@ -262,38 +262,38 @@ class CentralServiceTest(CentralTestCase):
# Ensure all values have been set correctly
self.assertIsNotNone(tld['id'])
self.assertEqual(tld['name'], self.get_tld_fixture(fixture=0)['name'])
self.assertEqual(self.get_tld_fixture(fixture=0)['name'], tld['name'])
# Create a TLD with more than one label
tld = self.create_tld(fixture=1)
# Ensure all values have been set correctly
self.assertIsNotNone(tld['id'])
self.assertEqual(tld['name'], self.get_tld_fixture(fixture=1)['name'])
self.assertEqual(self.get_tld_fixture(fixture=1)['name'], tld['name'])
def test_find_tlds(self):
# Ensure we have no tlds to start with.
tlds = self.central_service.find_tlds(self.admin_context)
self.assertEqual(len(tlds), 0)
self.assertEqual(0, len(tlds))
# Create a single tld
self.create_tld(fixture=0)
# Ensure we can retrieve the newly created tld
tlds = self.central_service.find_tlds(self.admin_context)
self.assertEqual(len(tlds), 1)
self.assertEqual(tlds[0]['name'],
self.get_tld_fixture(fixture=0)['name'])
self.assertEqual(1, len(tlds))
self.assertEqual(self.get_tld_fixture(fixture=0)['name'],
tlds[0]['name'])
# Create a second tld
self.create_tld(fixture=1)
# Ensure we can retrieve both tlds
tlds = self.central_service.find_tlds(self.admin_context)
self.assertEqual(len(tlds), 2)
self.assertEqual(tlds[0]['name'],
self.get_tld_fixture(fixture=0)['name'])
self.assertEqual(tlds[1]['name'],
self.get_tld_fixture(fixture=1)['name'])
self.assertEqual(2, len(tlds))
self.assertEqual(self.get_tld_fixture(fixture=0)['name'],
tlds[0]['name'])
self.assertEqual(self.get_tld_fixture(fixture=1)['name'],
tlds[1]['name'])
def test_get_tld(self):
# Create a tld
@ -304,8 +304,8 @@ class CentralServiceTest(CentralTestCase):
tld = self.central_service.get_tld(
self.admin_context, expected_tld['id'])
self.assertEqual(tld['id'], expected_tld['id'])
self.assertEqual(tld['name'], expected_tld['name'])
self.assertEqual(expected_tld['id'], tld['id'])
self.assertEqual(expected_tld['name'], tld['name'])
def test_update_tld(self):
# Create a tld
@ -345,31 +345,31 @@ class CentralServiceTest(CentralTestCase):
# Ensure all values have been set correctly
self.assertIsNotNone(tsigkey['id'])
self.assertEqual(tsigkey['name'], values['name'])
self.assertEqual(tsigkey['algorithm'], values['algorithm'])
self.assertEqual(tsigkey['secret'], values['secret'])
self.assertEqual(values['name'], tsigkey['name'])
self.assertEqual(values['algorithm'], tsigkey['algorithm'])
self.assertEqual(values['secret'], tsigkey['secret'])
def test_find_tsigkeys(self):
# Ensure we have no tsigkeys to start with.
tsigkeys = self.central_service.find_tsigkeys(self.admin_context)
self.assertEqual(len(tsigkeys), 0)
self.assertEqual(0, len(tsigkeys))
# Create a single tsigkey (using default values)
tsigkey_one = self.create_tsigkey()
# Ensure we can retrieve the newly created tsigkey
tsigkeys = self.central_service.find_tsigkeys(self.admin_context)
self.assertEqual(len(tsigkeys), 1)
self.assertEqual(tsigkeys[0]['name'], tsigkey_one['name'])
self.assertEqual(1, len(tsigkeys))
self.assertEqual(tsigkey_one['name'], tsigkeys[0]['name'])
# Create a second tsigkey
tsigkey_two = self.create_tsigkey(fixture=1)
# Ensure we can retrieve both tsigkeys
tsigkeys = self.central_service.find_tsigkeys(self.admin_context)
self.assertEqual(len(tsigkeys), 2)
self.assertEqual(tsigkeys[0]['name'], tsigkey_one['name'])
self.assertEqual(tsigkeys[1]['name'], tsigkey_two['name'])
self.assertEqual(2, len(tsigkeys))
self.assertEqual(tsigkey_one['name'], tsigkeys[0]['name'])
self.assertEqual(tsigkey_two['name'], tsigkeys[1]['name'])
def test_get_tsigkey(self):
# Create a tsigkey
@ -379,10 +379,10 @@ class CentralServiceTest(CentralTestCase):
tsigkey = self.central_service.get_tsigkey(
self.admin_context, expected['id'])
self.assertEqual(tsigkey['id'], expected['id'])
self.assertEqual(tsigkey['name'], expected['name'])
self.assertEqual(tsigkey['algorithm'], expected['algorithm'])
self.assertEqual(tsigkey['secret'], expected['secret'])
self.assertEqual(expected['id'], tsigkey['id'])
self.assertEqual(expected['name'], tsigkey['name'])
self.assertEqual(expected['algorithm'], tsigkey['algorithm'])
self.assertEqual(expected['secret'], tsigkey['secret'])
def test_update_tsigkey(self):
# Create a tsigkey
@ -422,14 +422,14 @@ class CentralServiceTest(CentralTestCase):
# in the beginning, there should be nothing
tenants = self.central_service.count_tenants(admin_context)
self.assertEqual(tenants, 0)
self.assertEqual(0, tenants)
# Explicitly set a tenant_id
self.create_domain(fixture=0, context=tenant_one_context)
self.create_domain(fixture=1, context=tenant_two_context)
tenants = self.central_service.count_tenants(admin_context)
self.assertEqual(tenants, 2)
self.assertEqual(2, tenants)
def test_count_tenants_policy_check(self):
# Set the policy to reject the authz
@ -450,11 +450,11 @@ class CentralServiceTest(CentralTestCase):
# Ensure all values have been set correctly
self.assertIsNotNone(domain['id'])
self.assertEqual(domain['name'], values['name'])
self.assertEqual(domain['email'], values['email'])
self.assertEqual(values['name'], domain['name'])
self.assertEqual(values['email'], domain['email'])
self.assertIn('status', domain)
self.assertEqual(mock_notifier.call_count, 1)
self.assertEqual(1, mock_notifier.call_count)
# Ensure the correct NS Records are in place
pool = self.central_service.get_pool(
@ -465,7 +465,7 @@ class CentralServiceTest(CentralTestCase):
criterion={'domain_id': domain.id, 'type': "NS"})
self.assertIsNotNone(ns_recordset.id)
self.assertEqual(ns_recordset.type, 'NS')
self.assertEqual('NS', ns_recordset.type)
self.assertIsNotNone(ns_recordset.records)
self.assertEqual(set([n.hostname for n in pool.ns_records]),
set([n.data for n in ns_recordset.records]))
@ -533,7 +533,7 @@ class CentralServiceTest(CentralTestCase):
# Ensure all values have been set correctly
self.assertIsNotNone(domain['id'])
self.assertEqual(domain['parent_domain_id'], parent_domain['id'])
self.assertEqual(parent_domain['id'], domain['parent_domain_id'])
def test_create_subdomain_different_pools(self):
fixture = self.get_domain_fixture()
@ -574,7 +574,7 @@ class CentralServiceTest(CentralTestCase):
# Ensure all values have been set correctly
self.assertIsNotNone(parent_domain['id'])
self.assertEqual(subdomain['parent_domain_id'], parent_domain['id'])
self.assertEqual(parent_domain['id'], subdomain['parent_domain_id'])
def test_create_subdomain_failure(self):
context = self.get_admin_context()
@ -719,29 +719,29 @@ class CentralServiceTest(CentralTestCase):
self.admin_context, objects.Domain.from_dict(values))
# Ensure all values have been set correctly
self.assertEqual(domain['ttl'], values['ttl'])
self.assertEqual(values['ttl'], domain['ttl'])
def test_find_domains(self):
# Ensure we have no domains to start with.
domains = self.central_service.find_domains(self.admin_context)
self.assertEqual(len(domains), 0)
self.assertEqual(0, len(domains))
# Create a single domain (using default values)
self.create_domain()
# Ensure we can retrieve the newly created domain
domains = self.central_service.find_domains(self.admin_context)
self.assertEqual(len(domains), 1)
self.assertEqual(domains[0]['name'], 'example.com.')
self.assertEqual(1, len(domains))
self.assertEqual('example.com.', domains[0]['name'])
# Create a second domain
self.create_domain(name='example.net.')
# Ensure we can retrieve both domain
domains = self.central_service.find_domains(self.admin_context)
self.assertEqual(len(domains), 2)
self.assertEqual(domains[0]['name'], 'example.com.')
self.assertEqual(domains[1]['name'], 'example.net.')
self.assertEqual(2, len(domains))
self.assertEqual('example.com.', domains[0]['name'])
self.assertEqual('example.net.', domains[1]['name'])
def test_find_domains_criteria(self):
# Create a domain
@ -754,9 +754,9 @@ class CentralServiceTest(CentralTestCase):
domains = self.central_service.find_domains(
self.admin_context, criterion)
self.assertEqual(domains[0]['id'], expected_domain['id'])
self.assertEqual(domains[0]['name'], expected_domain['name'])
self.assertEqual(domains[0]['email'], expected_domain['email'])
self.assertEqual(expected_domain['id'], domains[0]['id'])
self.assertEqual(expected_domain['name'], domains[0]['name'])
self.assertEqual(expected_domain['email'], domains[0]['email'])
def test_find_domains_tenant_restrictions(self):
admin_context = self.get_admin_context()
@ -767,24 +767,24 @@ class CentralServiceTest(CentralTestCase):
# Ensure we have no domains to start with.
domains = self.central_service.find_domains(admin_context)
self.assertEqual(len(domains), 0)
self.assertEqual(0, len(domains))
# Create a single domain (using default values)
domain = self.create_domain(context=tenant_one_context)
# Ensure admins can retrieve the newly created domain
domains = self.central_service.find_domains(admin_context)
self.assertEqual(len(domains), 1)
self.assertEqual(domains[0]['name'], domain['name'])
self.assertEqual(1, len(domains))
self.assertEqual(domain['name'], domains[0]['name'])
# Ensure tenant=1 can retrieve the newly created domain
domains = self.central_service.find_domains(tenant_one_context)
self.assertEqual(len(domains), 1)
self.assertEqual(domains[0]['name'], domain['name'])
self.assertEqual(1, len(domains))
self.assertEqual(domain['name'], domains[0]['name'])
# Ensure tenant=2 can NOT retrieve the newly created domain
domains = self.central_service.find_domains(tenant_two_context)
self.assertEqual(len(domains), 0)
self.assertEqual(0, len(domains))
def test_get_domain(self):
# Create a domain
@ -795,9 +795,9 @@ class CentralServiceTest(CentralTestCase):
domain = self.central_service.get_domain(
self.admin_context, expected_domain['id'])
self.assertEqual(domain['id'], expected_domain['id'])
self.assertEqual(domain['name'], expected_domain['name'])
self.assertEqual(domain['email'], expected_domain['email'])
self.assertEqual(expected_domain['id'], domain['id'])
self.assertEqual(expected_domain['name'], domain['name'])
self.assertEqual(expected_domain['email'], domain['email'])
def test_get_domain_servers(self):
# Create a domain
@ -820,9 +820,9 @@ class CentralServiceTest(CentralTestCase):
domain = self.central_service.find_domain(
self.admin_context, criterion)
self.assertEqual(domain['id'], expected_domain['id'])
self.assertEqual(domain['name'], expected_domain['name'])
self.assertEqual(domain['email'], expected_domain['email'])
self.assertEqual(expected_domain['id'], domain['id'])
self.assertEqual(expected_domain['name'], domain['name'])
self.assertEqual(expected_domain['email'], domain['email'])
self.assertIn('status', domain)
@mock.patch.object(notifier.Notifier, "info")
@ -848,7 +848,7 @@ class CentralServiceTest(CentralTestCase):
self.assertTrue(domain.serial > original_serial)
self.assertEqual('info@example.net', domain.email)
self.assertEqual(mock_notifier.call_count, 1)
self.assertEqual(1, mock_notifier.call_count)
# Check that the object used in the notify is a Domain and the id
# matches up
@ -948,18 +948,18 @@ class CentralServiceTest(CentralTestCase):
self.admin_context, domain['id'])
# Ensure the domain is marked for deletion
self.assertEqual(deleted_domain.id, domain.id)
self.assertEqual(deleted_domain.name, domain.name)
self.assertEqual(deleted_domain.email, domain.email)
self.assertEqual(deleted_domain.status, 'PENDING')
self.assertEqual(deleted_domain.tenant_id, domain.tenant_id)
self.assertEqual(deleted_domain.parent_domain_id,
domain.parent_domain_id)
self.assertEqual(deleted_domain.action, 'DELETE')
self.assertEqual(deleted_domain.serial, domain.serial)
self.assertEqual(deleted_domain.pool_id, domain.pool_id)
self.assertEqual(domain.id, deleted_domain.id)
self.assertEqual(domain.name, deleted_domain.name)
self.assertEqual(domain.email, deleted_domain.email)
self.assertEqual('PENDING', deleted_domain.status)
self.assertEqual(domain.tenant_id, deleted_domain.tenant_id)
self.assertEqual(domain.parent_domain_id,
deleted_domain.parent_domain_id)
self.assertEqual('DELETE', deleted_domain.action)
self.assertEqual(domain.serial, deleted_domain.serial)
self.assertEqual(domain.pool_id, deleted_domain.pool_id)
self.assertEqual(mock_notifier.call_count, 1)
self.assertEqual(1, mock_notifier.call_count)
# Check that the object used in the notify is a Domain and the id
# matches up
@ -985,7 +985,7 @@ class CentralServiceTest(CentralTestCase):
def test_count_domains(self):
# in the beginning, there should be nothing
domains = self.central_service.count_domains(self.admin_context)
self.assertEqual(domains, 0)
self.assertEqual(0, domains)
# Create a single domain
self.create_domain()
@ -994,7 +994,7 @@ class CentralServiceTest(CentralTestCase):
domains = self.central_service.count_domains(self.admin_context)
# well, did we get 1?
self.assertEqual(domains, 1)
self.assertEqual(1, domains)
def test_count_domains_policy_check(self):
# Set the policy to reject the authz
@ -1051,7 +1051,7 @@ class CentralServiceTest(CentralTestCase):
)
pxy = self.central_service.storage.session.execute(query)
self.assertEqual(pxy.rowcount, 1)
self.assertEqual(1, pxy.rowcount)
return domain
@mock.patch.object(notifier.Notifier, "info")
@ -1112,7 +1112,7 @@ class CentralServiceTest(CentralTestCase):
limit=100,
)
self._assert_count_all_domains(2)
self.assertEqual(purge_cnt, 1)
self.assertEqual(1, purge_cnt)
@mock.patch.object(notifier.Notifier, "info")
def test_purge_domains_without_time_threshold(self, mock_notifier):
@ -1132,7 +1132,7 @@ class CentralServiceTest(CentralTestCase):
limit=100,
)
self._assert_count_all_domains(1)
self.assertEqual(purge_cnt, 2)
self.assertEqual(2, purge_cnt)
@mock.patch.object(notifier.Notifier, "info")
def test_purge_domains_without_deleted_criterion(self, mock_notifier):
@ -1154,7 +1154,7 @@ class CentralServiceTest(CentralTestCase):
limit=100,
)
self._assert_count_all_domains(3)
self.assertEqual(purge_cnt, None)
self.assertEqual(None, purge_cnt)
@mock.patch.object(notifier.Notifier, "info")
def test_purge_domains_by_name(self, mock_notifier):
@ -1169,7 +1169,7 @@ class CentralServiceTest(CentralTestCase):
limit=100,
)
self._assert_count_all_domains(0)
self.assertEqual(purge_cnt, 1)
self.assertEqual(1, purge_cnt)
@mock.patch.object(notifier.Notifier, "info")
def test_purge_domains_without_any_criterion(self, mock_notifier):
@ -1197,7 +1197,7 @@ class CentralServiceTest(CentralTestCase):
limit=100,
)
n_zones = self.central_service.count_domains(self.admin_context)
self.assertEqual(n_zones, 1)
self.assertEqual(1, n_zones)
# purge domains in a shard that contains the domain created above
self.central_service.purge_domains(
@ -1210,7 +1210,7 @@ class CentralServiceTest(CentralTestCase):
limit=100,
)
n_zones = self.central_service.count_domains(self.admin_context)
self.assertEqual(n_zones, 0)
self.assertEqual(0, n_zones)
def test_purge_domains_walk_up_domains(self):
Zone = namedtuple('Zone', 'id parent_domain_id')
@ -1219,11 +1219,11 @@ class CentralServiceTest(CentralTestCase):
zones_by_id = {z.id: z for z in zones}
sid = self.central_service.storage._walk_up_domains(
zones[0], zones_by_id)
self.assertEqual(sid, 1234)
self.assertEqual(1234, sid)
sid = self.central_service.storage._walk_up_domains(
zones[-1], zones_by_id)
self.assertEqual(sid, 1234)
self.assertEqual(1234, sid)
def test_purge_domains_walk_up_domains_loop(self):
Zone = namedtuple('Zone', 'id parent_domain_id')
@ -1249,10 +1249,10 @@ class CentralServiceTest(CentralTestCase):
self._delete_domain(z3, old)
self._delete_domain(z4, old)
self.assertEqual(z2['parent_domain_id'], z1.id)
self.assertEqual(z3['parent_domain_id'], z2.id)
self.assertEqual(z4['parent_domain_id'], z3.id)
self.assertEqual(z5['parent_domain_id'], z4.id)
self.assertEqual(z1.id, z2['parent_domain_id'])
self.assertEqual(z2.id, z3['parent_domain_id'])
self.assertEqual(z3.id, z4['parent_domain_id'])
self.assertEqual(z4.id, z5['parent_domain_id'])
self._assert_count_all_domains(5)
mock_notifier.reset_mock()
@ -1272,11 +1272,11 @@ class CentralServiceTest(CentralTestCase):
self._log_all_domains(zones)
for z in zones:
if z.name == 'alive.org.':
self.assertEqual(z.parent_domain_id, None)
self.assertEqual(None, z.parent_domain_id)
elif z.name == 'alive2.del3.del2.deleted.alive.org.':
# alive2.del2.deleted.alive.org is to be reparented under
# alive.org
self.assertEqual(z.parent_domain_id, z1.id)
self.assertEqual(z1.id, z.parent_domain_id)
else:
raise Exception("Unexpected zone %r" % z)
@ -1379,7 +1379,7 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNotNone(recordset.records)
# The serial number does not get updated is there are no records
# in the recordset
self.assertEqual(new_serial, original_serial)
self.assertEqual(original_serial, new_serial)
def test_create_recordset_with_records(self):
domain = self.create_domain()
@ -1504,7 +1504,7 @@ class CentralServiceTest(CentralTestCase):
self.admin_context,
domain['id'],
recordset=objects.RecordSet.from_dict(values))
self.assertEqual(recordset['ttl'], values['ttl'])
self.assertEqual(values['ttl'], recordset['ttl'])
def test_get_recordset(self):
domain = self.create_domain()
@ -1516,9 +1516,9 @@ class CentralServiceTest(CentralTestCase):
recordset = self.central_service.get_recordset(
self.admin_context, domain['id'], expected['id'])
self.assertEqual(recordset['id'], expected['id'])
self.assertEqual(recordset['name'], expected['name'])
self.assertEqual(recordset['type'], expected['type'])
self.assertEqual(expected['id'], recordset['id'])
self.assertEqual(expected['name'], recordset['name'])
self.assertEqual(expected['type'], recordset['type'])
def test_get_recordset_with_records(self):
domain = self.create_domain()
@ -1558,7 +1558,7 @@ class CentralServiceTest(CentralTestCase):
recordsets = self.central_service.find_recordsets(
self.admin_context, criterion)
self.assertEqual(len(recordsets), 2)
self.assertEqual(2, len(recordsets))
# Create a single recordset (using default values)
self.create_recordset(domain, name='www.%s' % domain['name'])
@ -1567,8 +1567,8 @@ class CentralServiceTest(CentralTestCase):
recordsets = self.central_service.find_recordsets(
self.admin_context, criterion)
self.assertEqual(len(recordsets), 3)
self.assertEqual(recordsets[2]['name'], 'www.%s' % domain['name'])
self.assertEqual(3, len(recordsets))
self.assertEqual('www.%s' % domain['name'], recordsets[2]['name'])
# Create a second recordset
self.create_recordset(domain, name='mail.%s' % domain['name'])
@ -1577,9 +1577,9 @@ class CentralServiceTest(CentralTestCase):
recordsets = self.central_service.find_recordsets(
self.admin_context, criterion)
self.assertEqual(len(recordsets), 4)
self.assertEqual(recordsets[2]['name'], 'www.%s' % domain['name'])
self.assertEqual(recordsets[3]['name'], 'mail.%s' % domain['name'])
self.assertEqual(4, len(recordsets))
self.assertEqual('www.%s' % domain['name'], recordsets[2]['name'])
self.assertEqual('mail.%s' % domain['name'], recordsets[3]['name'])
def test_find_recordset(self):
domain = self.create_domain()
@ -1593,8 +1593,8 @@ class CentralServiceTest(CentralTestCase):
recordset = self.central_service.find_recordset(
self.admin_context, criterion)
self.assertEqual(recordset['id'], expected['id'])
self.assertEqual(recordset['name'], expected['name'])
self.assertEqual(expected['id'], recordset['id'], expected['id'])
self.assertEqual(expected['name'], recordset['name'])
def test_find_recordset_with_records(self):
domain = self.create_domain()
@ -1638,7 +1638,7 @@ class CentralServiceTest(CentralTestCase):
self.admin_context, recordset.domain_id, recordset.id)
# Ensure the new value took
self.assertEqual(recordset.ttl, 1800)
self.assertEqual(1800, recordset.ttl)
self.assertThat(new_serial, GreaterThan(original_serial))
def test_update_recordset_deadlock_retry(self):
@ -1788,7 +1788,7 @@ class CentralServiceTest(CentralTestCase):
self.admin_context, recordset.domain_id, recordset.id)
# Ensure the recordset was updated correctly
self.assertEqual(recordset.ttl, 1800)
self.assertEqual(1800, recordset.ttl)
# Ensure the domains serial number was not updated
domain_after = self.central_service.get_domain(
@ -1906,7 +1906,7 @@ class CentralServiceTest(CentralTestCase):
def test_count_recordsets(self):
# in the beginning, there should be nothing
recordsets = self.central_service.count_recordsets(self.admin_context)
self.assertEqual(recordsets, 0)
self.assertEqual(0, recordsets)
# Create a domain to put our recordset in
domain = self.create_domain()
@ -1916,7 +1916,7 @@ class CentralServiceTest(CentralTestCase):
# We should have 1 recordset now, plus SOA & NS recordsets
recordsets = self.central_service.count_recordsets(self.admin_context)
self.assertEqual(recordsets, 3)
self.assertEqual(3, recordsets)
def test_count_recordsets_policy_check(self):
# Set the policy to reject the authz
@ -1941,7 +1941,7 @@ class CentralServiceTest(CentralTestCase):
# Ensure all values have been set correctly
self.assertIsNotNone(record['id'])
self.assertEqual(record['data'], values['data'])
self.assertEqual(values['data'], record['data'])
self.assertIn('status', record)
def test_create_record_over_domain_quota(self):
@ -2000,8 +2000,8 @@ class CentralServiceTest(CentralTestCase):
record = self.central_service.get_record(
self.admin_context, domain['id'], recordset['id'], expected['id'])
self.assertEqual(record['id'], expected['id'])
self.assertEqual(record['data'], expected['data'])
self.assertEqual(expected['id'], record['id'])
self.assertEqual(expected['data'], record['data'])
self.assertIn('status', record)
def test_get_record_incorrect_domain_id(self):
@ -2045,7 +2045,7 @@ class CentralServiceTest(CentralTestCase):
records = self.central_service.find_records(
self.admin_context, criterion)
self.assertEqual(len(records), 0)
self.assertEqual(0, len(records))
# Create a single record (using default values)
expected_one = self.create_record(domain, recordset)
@ -2054,8 +2054,8 @@ class CentralServiceTest(CentralTestCase):
records = self.central_service.find_records(
self.admin_context, criterion)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]['data'], expected_one['data'])
self.assertEqual(1, len(records))
self.assertEqual(expected_one['data'], records[0]['data'])
# Create a second record
expected_two = self.create_record(domain, recordset, fixture=1)
@ -2064,9 +2064,9 @@ class CentralServiceTest(CentralTestCase):
records = self.central_service.find_records(
self.admin_context, criterion)
self.assertEqual(len(records), 2)
self.assertEqual(records[0]['data'], expected_one['data'])
self.assertEqual(records[1]['data'], expected_two['data'])
self.assertEqual(2, len(records))
self.assertEqual(expected_one['data'], records[0]['data'])
self.assertEqual(expected_two['data'], records[1]['data'])
def test_find_record(self):
domain = self.create_domain()
@ -2085,8 +2085,8 @@ class CentralServiceTest(CentralTestCase):
record = self.central_service.find_record(
self.admin_context, criterion)
self.assertEqual(record['id'], expected['id'])
self.assertEqual(record['data'], expected['data'])
self.assertEqual(expected['id'], record['id'])
self.assertEqual(expected['data'], record['data'])
self.assertIn('status', record)
def test_update_record(self):
@ -2198,14 +2198,14 @@ class CentralServiceTest(CentralTestCase):
record['id'])
# Ensure the record is marked for deletion
self.assertEqual(deleted_record.id, record.id)
self.assertEqual(deleted_record.data, record.data)
self.assertEqual(deleted_record.domain_id, record.domain_id)
self.assertEqual(deleted_record.status, 'PENDING')
self.assertEqual(deleted_record.tenant_id, record.tenant_id)
self.assertEqual(deleted_record.recordset_id, record.recordset_id)
self.assertEqual(deleted_record.action, 'DELETE')
self.assertEqual(deleted_record.serial, new_domain_serial)
self.assertEqual(record.id, deleted_record.id)
self.assertEqual(record.data, deleted_record.data)
self.assertEqual(record.domain_id, deleted_record.domain_id)
self.assertEqual('PENDING', deleted_record.status)
self.assertEqual(record.tenant_id, deleted_record.tenant_id)
self.assertEqual(record.recordset_id, deleted_record.recordset_id)
self.assertEqual('DELETE', deleted_record.action)
self.assertEqual(new_domain_serial, deleted_record.serial)
def test_delete_record_without_incrementing_serial(self):
domain = self.create_domain()
@ -2226,7 +2226,7 @@ class CentralServiceTest(CentralTestCase):
# Ensure the domains serial number was not updated
new_domain_serial = self.central_service.get_domain(
self.admin_context, domain['id'])['serial']
self.assertEqual(new_domain_serial, domain_serial)
self.assertEqual(domain_serial, new_domain_serial)
# Fetch the record
deleted_record = self.central_service.get_record(
@ -2234,14 +2234,14 @@ class CentralServiceTest(CentralTestCase):
record['id'])
# Ensure the record is marked for deletion
self.assertEqual(deleted_record.id, record.id)
self.assertEqual(deleted_record.data, record.data)
self.assertEqual(deleted_record.domain_id, record.domain_id)
self.assertEqual(deleted_record.status, 'PENDING')
self.assertEqual(deleted_record.tenant_id, record.tenant_id)
self.assertEqual(deleted_record.recordset_id, record.recordset_id)
self.assertEqual(deleted_record.action, 'DELETE')
self.assertEqual(deleted_record.serial, new_domain_serial)
self.assertEqual(record.id, deleted_record.id)
self.assertEqual(record.data, deleted_record.data)
self.assertEqual(record.domain_id, deleted_record.domain_id)
self.assertEqual('PENDING', deleted_record.status)
self.assertEqual(record.tenant_id, deleted_record.tenant_id)
self.assertEqual(record.recordset_id, deleted_record.recordset_id)
self.assertEqual('DELETE', deleted_record.action)
self.assertEqual(new_domain_serial, deleted_record.serial)
def test_delete_record_incorrect_domain_id(self):
domain = self.create_domain()
@ -2274,7 +2274,7 @@ class CentralServiceTest(CentralTestCase):
def test_count_records(self):
# in the beginning, there should be nothing
records = self.central_service.count_records(self.admin_context)
self.assertEqual(records, 0)
self.assertEqual(0, records)
# Create a domain and recordset to put our record in
domain = self.create_domain()
@ -2285,7 +2285,7 @@ class CentralServiceTest(CentralTestCase):
# we should have 1 record now, plus SOA & NS records
records = self.central_service.count_records(self.admin_context)
self.assertEqual(records, 3)
self.assertEqual(3, records)
def test_count_records_policy_check(self):
# Set the policy to reject the authz
@ -2621,8 +2621,8 @@ class CentralServiceTest(CentralTestCase):
# Verify all values have been set correctly
self.assertIsNotNone(blacklist['id'])
self.assertEqual(blacklist['pattern'], values['pattern'])
self.assertEqual(blacklist['description'], values['description'])
self.assertEqual(values['pattern'], blacklist['pattern'])
self.assertEqual(values['description'], blacklist['description'])
def test_get_blacklist(self):
# Create a blacklisted zone
@ -2632,16 +2632,16 @@ class CentralServiceTest(CentralTestCase):
blacklist = self.central_service.get_blacklist(
self.admin_context, expected['id'])
self.assertEqual(blacklist['id'], expected['id'])
self.assertEqual(blacklist['pattern'], expected['pattern'])
self.assertEqual(blacklist['description'], expected['description'])
self.assertEqual(expected['id'], blacklist['id'])
self.assertEqual(expected['pattern'], blacklist['pattern'])
self.assertEqual(expected['description'], blacklist['description'])
def test_find_blacklists(self):
# Verify there are no blacklisted zones to start with
blacklists = self.central_service.find_blacklists(
self.admin_context)
self.assertEqual(len(blacklists), 0)
self.assertEqual(0, len(blacklists))
# Create a single blacklisted zone
self.create_blacklist()
@ -2651,8 +2651,8 @@ class CentralServiceTest(CentralTestCase):
self.admin_context)
values1 = self.get_blacklist_fixture(fixture=0)
self.assertEqual(len(blacklists), 1)
self.assertEqual(blacklists[0]['pattern'], values1['pattern'])
self.assertEqual(1, len(blacklists))
self.assertEqual(values1['pattern'], blacklists[0]['pattern'])
# Create a second blacklisted zone
self.create_blacklist(fixture=1)
@ -2663,9 +2663,9 @@ class CentralServiceTest(CentralTestCase):
values2 = self.get_blacklist_fixture(fixture=1)
self.assertEqual(len(blacklists), 2)
self.assertEqual(blacklists[0]['pattern'], values1['pattern'])
self.assertEqual(blacklists[1]['pattern'], values2['pattern'])
self.assertEqual(2, len(blacklists))
self.assertEqual(values1['pattern'], blacklists[0]['pattern'])
self.assertEqual(values2['pattern'], blacklists[1]['pattern'])
def test_find_blacklist(self):
# Create a blacklisted zone
@ -2675,8 +2675,8 @@ class CentralServiceTest(CentralTestCase):
blacklist = self.central_service.find_blacklist(
self.admin_context, {'id': expected['id']})
self.assertEqual(blacklist['pattern'], expected['pattern'])
self.assertEqual(blacklist['description'], expected['description'])
self.assertEqual(expected['pattern'], blacklist['pattern'])
self.assertEqual(expected['description'], blacklist['description'])
def test_update_blacklist(self):
# Create a blacklisted zone
@ -2731,12 +2731,12 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNotNone(soa.id)
self.assertEqual('SOA', soa.type)
self.assertIsNotNone(soa.records)
self.assertEqual(int(soa_record_values[2]), zone['serial'])
self.assertEqual(soa_record_values[1], zone_email)
self.assertEqual(int(soa_record_values[3]), zone['refresh'])
self.assertEqual(int(soa_record_values[4]), zone['retry'])
self.assertEqual(int(soa_record_values[5]), zone['expire'])
self.assertEqual(int(soa_record_values[6]), zone['minimum'])
self.assertEqual(zone['serial'], int(soa_record_values[2]))
self.assertEqual(zone_email, soa_record_values[1])
self.assertEqual(zone['refresh'], int(soa_record_values[3]))
self.assertEqual(zone['retry'], int(soa_record_values[4]))
self.assertEqual(zone['expire'], int(soa_record_values[5]))
self.assertEqual(zone['minimum'], int(soa_record_values[6]))
def test_update_soa(self):
# Anytime the zone's serial number is incremented
@ -2762,7 +2762,7 @@ class CentralServiceTest(CentralTestCase):
# Split out the various soa values
soa_record_values = soa.records[0].data.split()
self.assertEqual(int(soa_record_values[2]), updated_zone['serial'])
self.assertEqual(updated_zone['serial'], int(soa_record_values[2]))
# Pool Tests
def test_create_pool(self):
@ -2781,7 +2781,7 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNotNone(pool['attributes'])
self.assertIsNotNone(pool['ns_records'])
self.assertEqual(pool['name'], values['name'])
self.assertEqual(values['name'], pool['name'])
# Compare the actual values of attributes and ns_records
for k in range(0, len(values['attributes'])):
@ -2803,30 +2803,30 @@ class CentralServiceTest(CentralTestCase):
pool = self.central_service.get_pool(self.admin_context,
expected['id'])
self.assertEqual(pool['id'], expected['id'])
self.assertEqual(pool['created_at'], expected['created_at'])
self.assertEqual(pool['version'], expected['version'])
self.assertEqual(pool['tenant_id'], expected['tenant_id'])
self.assertEqual(pool['name'], expected['name'])
self.assertEqual(expected['id'], pool['id'])
self.assertEqual(expected['created_at'], pool['created_at'])
self.assertEqual(expected['version'], pool['version'])
self.assertEqual(expected['tenant_id'], pool['tenant_id'])
self.assertEqual(expected['name'], pool['name'])
# Compare the actual values of attributes and ns_records
for k in range(0, len(expected['attributes'])):
self.assertEqual(
pool['attributes'][k].to_primitive()['designate_object.data'],
expected['attributes'][k].to_primitive()
['designate_object.data'])
['designate_object.data'],
pool['attributes'][k].to_primitive()['designate_object.data'])
for k in range(0, len(expected['ns_records'])):
self.assertEqual(
pool['ns_records'][k].to_primitive()['designate_object.data'],
expected['ns_records'][k].to_primitive()
['designate_object.data'])
['designate_object.data'],
pool['ns_records'][k].to_primitive()['designate_object.data'])
def test_find_pools(self):
# Verify no pools exist, except for default pool
pools = self.central_service.find_pools(self.admin_context)
self.assertEqual(len(pools), 1)
self.assertEqual(1, len(pools))
# Create a pool
self.create_pool(fixture=0)
@ -2835,21 +2835,21 @@ class CentralServiceTest(CentralTestCase):
pools = self.central_service.find_pools(self.admin_context)
values = self.get_pool_fixture(fixture=0)
self.assertEqual(len(pools), 2)
self.assertEqual(pools[1]['name'], values['name'])
self.assertEqual(2, len(pools))
self.assertEqual(values['name'], pools[1]['name'])
# Compare the actual values of attributes and ns_records
expected_attributes = values['attributes'][0]
actual_attributes = \
pools[1]['attributes'][0].to_primitive()['designate_object.data']
for k in expected_attributes:
self.assertEqual(actual_attributes[k], expected_attributes[k])
self.assertEqual(expected_attributes[k], actual_attributes[k])
expected_ns_records = values['ns_records'][0]
actual_ns_records = \
pools[1]['ns_records'][0].to_primitive()['designate_object.data']
for k in expected_ns_records:
self.assertEqual(actual_ns_records[k], expected_ns_records[k])
self.assertEqual(expected_ns_records[k], actual_ns_records[k])
def test_find_pool(self):
# Create a server pool
@ -2859,20 +2859,20 @@ class CentralServiceTest(CentralTestCase):
pool = self.central_service.find_pool(self.admin_context,
{'id': expected['id']})
self.assertEqual(pool['name'], expected['name'])
self.assertEqual(expected['name'], pool['name'])
# Compare the actual values of attributes and ns_records
for k in range(0, len(expected['attributes'])):
self.assertEqual(
pool['attributes'][k].to_primitive()['designate_object.data'],
expected['attributes'][k].to_primitive()
['designate_object.data'])
['designate_object.data'],
pool['attributes'][k].to_primitive()['designate_object.data'])
for k in range(0, len(expected['ns_records'])):
self.assertEqual(
pool['ns_records'][k].to_primitive()['designate_object.data'],
expected['ns_records'][k].to_primitive()
['designate_object.data'])
['designate_object.data'],
pool['ns_records'][k].to_primitive()['designate_object.data'])
def test_update_pool(self):
# Create a server pool
@ -3031,7 +3031,7 @@ class CentralServiceTest(CentralTestCase):
new_domain_serial = self.central_service.get_domain(
self.admin_context, domain['id']).serial
self.assertEqual(new_domain_serial, domain_serial)
self.assertEqual(domain_serial, new_domain_serial)
def test_create_zone_transfer_request(self):
domain = self.create_domain()
@ -3041,7 +3041,7 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNotNone(zone_transfer_request.id)
self.assertIsNotNone(zone_transfer_request.tenant_id)
self.assertIsNotNone(zone_transfer_request.key)
self.assertEqual(zone_transfer_request.domain_id, domain.id)
self.assertEqual(domain.id, zone_transfer_request.domain_id)
def test_create_zone_transfer_request_duplicate(self):
domain = self.create_domain()
@ -3059,11 +3059,11 @@ class CentralServiceTest(CentralTestCase):
# Verify all values have been set correctly
self.assertIsNotNone(zone_transfer_request.id)
self.assertIsNotNone(zone_transfer_request.tenant_id)
self.assertEqual(zone_transfer_request.domain_id, domain.id)
self.assertEqual(domain.id, zone_transfer_request.domain_id)
self.assertIsNotNone(zone_transfer_request.key)
self.assertEqual(
zone_transfer_request.target_tenant_id,
values['target_tenant_id'])
values['target_tenant_id'],
zone_transfer_request.target_tenant_id)
def test_get_zone_transfer_request(self):
domain = self.create_domain()
@ -3107,7 +3107,7 @@ class CentralServiceTest(CentralTestCase):
self.assertIsNotNone(zone_transfer_request.id)
self.assertIsNotNone(zone_transfer_request.tenant_id)
self.assertIsNotNone(zone_transfer_request.key)
self.assertEqual(zone_transfer_request.description, 'TEST')
self.assertEqual('TEST', zone_transfer_request.description)
def test_delete_zone_transfer_request(self):
domain = self.create_domain()
@ -3163,15 +3163,15 @@ class CentralServiceTest(CentralTestCase):
admin_context, zone_transfer_request.id)
self.assertEqual(
result['domain'].tenant_id, str(tenant_2_context.tenant))
str(tenant_2_context.tenant), result['domain'].tenant_id)
self.assertEqual(
result['recordset'].tenant_id, str(tenant_2_context.tenant))
str(tenant_2_context.tenant), result['recordset'].tenant_id)
self.assertEqual(
result['record'].tenant_id, str(tenant_2_context.tenant))
str(tenant_2_context.tenant), result['record'].tenant_id)
self.assertEqual(
result['zt_accept'].status, 'COMPLETE')
'COMPLETE', result['zt_accept'].status)
self.assertEqual(
result['zt_request'].status, 'COMPLETE')
'COMPLETE', result['zt_request'].status)
def test_create_zone_transfer_accept_scoped(self):
tenant_1_context = self.get_context(tenant=1)
@ -3216,15 +3216,15 @@ class CentralServiceTest(CentralTestCase):
admin_context, zone_transfer_request.id)
self.assertEqual(
result['domain'].tenant_id, str(tenant_2_context.tenant))
str(tenant_2_context.tenant), result['domain'].tenant_id)
self.assertEqual(
result['recordset'].tenant_id, str(tenant_2_context.tenant))
str(tenant_2_context.tenant), result['recordset'].tenant_id)
self.assertEqual(
result['record'].tenant_id, str(tenant_2_context.tenant))
str(tenant_2_context.tenant), result['record'].tenant_id)
self.assertEqual(
result['zt_accept'].status, 'COMPLETE')
'COMPLETE', result['zt_accept'].status)
self.assertEqual(
result['zt_request'].status, 'COMPLETE')
'COMPLETE', result['zt_request'].status)
def test_create_zone_transfer_accept_failed_key(self):
tenant_1_context = self.get_context(tenant=1)
@ -3286,9 +3286,9 @@ class CentralServiceTest(CentralTestCase):
# Ensure all values have been set correctly
self.assertIsNotNone(zone_import['id'])
self.assertEqual(zone_import.status, 'PENDING')
self.assertEqual(zone_import.message, None)
self.assertEqual(zone_import.domain_id, None)
self.assertEqual('PENDING', zone_import.status)
self.assertEqual(None, zone_import.message)
self.assertEqual(None, zone_import.domain_id)
self.wait_for_import(zone_import.id)
@ -3298,7 +3298,7 @@ class CentralServiceTest(CentralTestCase):
# Ensure we have no zone_imports to start with.
zone_imports = self.central_service.find_zone_imports(
self.admin_context)
self.assertEqual(len(zone_imports), 0)
self.assertEqual(0, len(zone_imports))
# Create a single zone_import
request_body = self.get_zonefile_fixture()
@ -3311,7 +3311,7 @@ class CentralServiceTest(CentralTestCase):
# Ensure we can retrieve the newly created zone_import
zone_imports = self.central_service.find_zone_imports(
self.admin_context)
self.assertEqual(len(zone_imports), 1)
self.assertEqual(1, len(zone_imports))
# Create a second zone_import
request_body = self.get_zonefile_fixture(variant="two")
@ -3324,9 +3324,9 @@ class CentralServiceTest(CentralTestCase):
# Ensure we can retrieve both zone_imports
zone_imports = self.central_service.find_zone_imports(
self.admin_context)
self.assertEqual(len(zone_imports), 2)
self.assertEqual(zone_imports[0].status, 'COMPLETE')
self.assertEqual(zone_imports[1].status, 'COMPLETE')
self.assertEqual(2, len(zone_imports))
self.assertEqual('COMPLETE', zone_imports[0].status)
self.assertEqual('COMPLETE', zone_imports[1].status)
def test_get_zone_import(self):
# Create a Zone Import
@ -3342,8 +3342,8 @@ class CentralServiceTest(CentralTestCase):
zone_import = self.central_service.get_zone_import(
self.admin_context, zone_import.id)
self.assertEqual(zone_import['id'], zone_import.id)
self.assertEqual(zone_import['status'], zone_import.status)
self.assertEqual(zone_import.id, zone_import['id'])
self.assertEqual(zone_import.status, zone_import['status'])
self.assertEqual('COMPLETE', zone_import.status)
def test_update_zone_import(self):