Using "Designate API" in addition to DNS query client check

As of now the only validation done in Neutron&Designate E2E scenario
test cases is based on DNS query client only.
As a result, we don't really know what's happens on the Designate side,
for example, we wouldn't expect to see the recordset on Backend if it
wasn't previously created on the Designate.
This patch improves the situation and it will definitely help on debugging.
It will use "Designate APIs" when "query client" throws an assertion.
For example, if "A" type recordset wasn't detected on backend (as expected
by test), "Designate APIs" will be used to provide more details on failure.

Change-Id: Ie2093e457e0b664f36204d61f3020ef84ef15a3e
This commit is contained in:
Arkady Shtempler 2022-09-29 13:45:52 +03:00
parent 021ce916c6
commit d5acee3804
1 changed files with 81 additions and 9 deletions

View File

@ -17,9 +17,11 @@ import ipaddress
import testtools
from oslo_log import log
from tempest.common import utils
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
@ -30,6 +32,8 @@ from neutron_tempest_plugin.scenario import constants
CONF = config.CONF
LOG = log.getLogger(__name__)
# Note(jh): Need to do a bit of juggling here in order to avoid failures
# when designate_tempest_plugin is not available
@ -47,7 +51,8 @@ class BaseDNSIntegrationTests(base.BaseTempestTestCase, DNSMixin):
@classmethod
def setup_clients(cls):
super(BaseDNSIntegrationTests, cls).setup_clients()
cls.dns_client = cls.os_tempest.dns_v2.ZonesClient()
cls.zone_client = cls.os_tempest.dns_v2.ZonesClient()
cls.recordset_client = cls.os_tempest.dns_v2.RecordsetClient()
cls.query_client.build_timeout = 30
@classmethod
@ -63,11 +68,11 @@ class BaseDNSIntegrationTests(base.BaseTempestTestCase, DNSMixin):
@utils.requires_ext(extension="dns-integration", service="network")
def resource_setup(cls):
super(BaseDNSIntegrationTests, cls).resource_setup()
_, cls.zone = cls.dns_client.create_zone()
cls.addClassResourceCleanup(cls.dns_client.delete_zone,
cls.zone = cls.zone_client.create_zone()[1]
cls.addClassResourceCleanup(cls.zone_client.delete_zone,
cls.zone['id'], ignore_errors=lib_exc.NotFound)
dns_waiters.wait_for_zone_status(
cls.dns_client, cls.zone['id'], 'ACTIVE')
cls.zone_client, cls.zone['id'], 'ACTIVE')
cls.network = cls.create_network(dns_domain=cls.zone['name'])
cls.subnet = cls.create_subnet(cls.network)
@ -94,12 +99,79 @@ class BaseDNSIntegrationTests(base.BaseTempestTestCase, DNSMixin):
fip = self.create_floatingip(port=port)
return {'port': port, 'fip': fip, 'server': server}
def _check_type_in_recordsets(self, zone_id, rec_type):
types = [rec['type'] for rec in self.recordset_client.list_recordset(
zone_id)[1]['recordsets']]
if rec_type in types:
return True
return False
def _wait_for_type_in_recordsets(self, zone_id, type):
test_utils.call_until_true(
func=self._check_type_in_recordsets, zone_id=zone_id,
rec_type=type, duration=self.query_client.build_timeout,
sleep_for=5)
def _check_recordset_deleted(
self, recordset_client, zone_id, recordset_id):
return test_utils.call_and_ignore_notfound_exc(
recordset_client.show_recordset, zone_id, recordset_id) is None
def _verify_designate_recordset(
self, address, found=True, record_type='A'):
if found:
self._wait_for_type_in_recordsets(self.zone['id'], record_type)
recordsets = self.recordset_client.list_recordset(
self.zone['id'])[1]['recordsets']
relevant_type = [rec for rec in recordsets if
rec['type'] == record_type]
self.assertTrue(
relevant_type,
'Failed no {} type recordset has been detected in the '
'Designate DNS DB'.format(record_type))
rec_id = [rec['id'] for rec in relevant_type if address in
str(rec['records'])][0]
self.assertTrue(
rec_id, 'Record of type:{} with IP:{} was not detected in '
'the Designate DNS DB'.format(record_type, address))
dns_waiters.wait_for_recordset_status(
self.recordset_client, self.zone['id'], rec_id, 'ACTIVE')
else:
rec_id = None
recordsets = self.recordset_client.list_recordset(
self.zone['id'])[1]['recordsets']
relevant_type = [rec for rec in recordsets if
rec['type'] == record_type]
if relevant_type:
rec_id = [rec['id'] for rec in relevant_type if
address in str(rec['records'])][0]
if rec_id:
recordset_exists = test_utils.call_until_true(
func=self._check_recordset_deleted,
recordset_client=self.recordset_client,
zone_id=self.zone['id'], recordset_id=rec_id,
duration=self.query_client.build_timeout, sleep_for=5)
self.assertTrue(
recordset_exists,
'Failed, recordset type:{} and ID:{} is still exist in '
'the Designate DNS DB'.format(record_type, rec_id))
def _verify_dns_records(self, address, name, found=True, record_type='A'):
client = self.query_client
forward = name + '.' + self.zone['name']
reverse = ipaddress.ip_address(address).reverse_pointer
dns_waiters.wait_for_query(client, forward, record_type, found)
dns_waiters.wait_for_query(client, reverse, 'PTR', found)
record_types_to_check = [record_type, 'PTR']
for rec_type in record_types_to_check:
try:
if rec_type == 'PTR':
dns_waiters.wait_for_query(
client, reverse, rec_type, found)
else:
dns_waiters.wait_for_query(
client, forward, rec_type, found)
except Exception as e:
LOG.error(e)
self._verify_designate_recordset(address, found, rec_type)
if not found:
return
fwd_response = client.query(forward, record_type)
@ -222,11 +294,11 @@ class DNSIntegrationDomainPerProjectTests(BaseDNSIntegrationTests):
name)
dns_domain_template = "<user_id>.<project_id>.%s.zone." % name
_, cls.zone = cls.dns_client.create_zone(name=zone_name)
cls.addClassResourceCleanup(cls.dns_client.delete_zone,
cls.zone = cls.zone_client.create_zone(name=zone_name)[1]
cls.addClassResourceCleanup(cls.zone_client.delete_zone,
cls.zone['id'], ignore_errors=lib_exc.NotFound)
dns_waiters.wait_for_zone_status(
cls.dns_client, cls.zone['id'], 'ACTIVE')
cls.zone_client, cls.zone['id'], 'ACTIVE')
cls.network = cls.create_network(dns_domain=dns_domain_template)
cls.subnet = cls.create_subnet(cls.network,