Update mDNS to accept server object
Mdns now accepts a server object rather than destination host:port. This simplifies processing on the pool manager. Change-Id: I17ee0d65167cbc57098e5f0fd75593f75bf946b3 Closes-Bug: 1412601
This commit is contained in:
parent
c5ec416b7a
commit
293390f6e4
|
@ -18,7 +18,6 @@ import dns
|
|||
from oslo import messaging
|
||||
from oslo.config import cfg
|
||||
|
||||
from designate import exceptions
|
||||
from designate.pool_manager import rpcapi as pool_mngr_api
|
||||
from designate.openstack.common import log as logging
|
||||
from designate.i18n import _LI
|
||||
|
@ -41,16 +40,15 @@ class NotifyEndpoint(object):
|
|||
def pool_manager_api(self):
|
||||
return pool_mngr_api.PoolManagerAPI.get_instance()
|
||||
|
||||
def notify_zone_changed(self, context, domain, destination, timeout,
|
||||
def notify_zone_changed(self, context, domain, server, timeout,
|
||||
retry_interval, max_retries, delay):
|
||||
"""
|
||||
:param context: The user context.
|
||||
:param domain: The designate domain object. This contains the domain
|
||||
name.
|
||||
:param destination: The destination of the NOTIFY message. This is of
|
||||
the format "ip:[port]". If there is no port, port 53 is used.
|
||||
:param server: A notify is sent to server.host:server.port.
|
||||
:param timeout: The time (in seconds) to wait for a NOTIFY response
|
||||
from destination.
|
||||
from server.
|
||||
:param retry_interval: The time (in seconds) between retries.
|
||||
:param max_retries: The maximum number of retries mindns would do for
|
||||
sending a NOTIFY message. After this many retries, mindns gives up.
|
||||
|
@ -62,20 +60,18 @@ class NotifyEndpoint(object):
|
|||
"""
|
||||
time.sleep(delay)
|
||||
return self._make_and_send_dns_message(
|
||||
domain, destination, timeout, retry_interval, max_retries,
|
||||
notify=True)
|
||||
domain, server, timeout, retry_interval, max_retries, notify=True)
|
||||
|
||||
def poll_for_serial_number(self, context, domain, destination, timeout,
|
||||
def poll_for_serial_number(self, context, domain, server, timeout,
|
||||
retry_interval, max_retries, delay):
|
||||
"""
|
||||
:param context: The user context.
|
||||
:param domain: The designate domain object. This contains the domain
|
||||
name. domain.serial = expected_serial
|
||||
:param destination: The server to check for an updated serial number.
|
||||
This is of the format "ip:[port]". If there is no port, port 53 is
|
||||
used.
|
||||
:param server: server.host:server.port is checked for an updated serial
|
||||
number.
|
||||
:param timeout: The time (in seconds) to wait for a SOA response from
|
||||
destination.
|
||||
server.
|
||||
:param retry_interval: The time (in seconds) between retries.
|
||||
:param max_retries: The maximum number of retries mindns would do for
|
||||
an expected serial number. After this many retries, mindns returns
|
||||
|
@ -84,7 +80,7 @@ class NotifyEndpoint(object):
|
|||
:return: a tuple of (status, actual_serial, retries)
|
||||
status is either "SUCCESS" or "ERROR".
|
||||
actual_serial is either the serial number returned in the SOA
|
||||
message from the destination or None.
|
||||
message from the server or None.
|
||||
retries is the number of retries left.
|
||||
The return value is just used for testing and not by pool manager.
|
||||
The pool manager is informed of the status with update_status.
|
||||
|
@ -96,7 +92,7 @@ class NotifyEndpoint(object):
|
|||
time.sleep(delay)
|
||||
while (True):
|
||||
(response, retry) = self._make_and_send_dns_message(
|
||||
domain, destination, timeout, retry_interval, retries)
|
||||
domain, server, timeout, retry_interval, retries)
|
||||
if response and len(response.answer) == 1 \
|
||||
and str(response.answer[0].name) == str(domain.name) \
|
||||
and response.answer[0].rdclass == dns.rdataclass.IN \
|
||||
|
@ -110,12 +106,12 @@ class NotifyEndpoint(object):
|
|||
# TODO(vinod): Account for serial number wrap around.
|
||||
elif actual_serial < domain.serial:
|
||||
retries = retries - retry
|
||||
LOG.warn(_LW("Got lower serial for '%(zone)s' to '%(dst)s'. "
|
||||
"Expected:'%(es)d'. Got:'%(as)d'."
|
||||
LOG.warn(_LW("Got lower serial for '%(zone)s' to '%(host)s:"
|
||||
"%(port)s'. Expected:'%(es)d'. Got:'%(as)d'."
|
||||
"Retries left='%(retries)d'") %
|
||||
{'zone': domain.name, 'dst': destination,
|
||||
'es': domain.serial, 'as': actual_serial,
|
||||
'retries': retries})
|
||||
{'zone': domain.name, 'host': server.host,
|
||||
'port': server.port, 'es': domain.serial,
|
||||
'as': actual_serial, 'retries': retries})
|
||||
if retries > 0:
|
||||
# retry again
|
||||
time.sleep(retry_interval)
|
||||
|
@ -128,18 +124,18 @@ class NotifyEndpoint(object):
|
|||
break
|
||||
|
||||
self.pool_manager_api.update_status(
|
||||
context, domain, destination, status, actual_serial)
|
||||
context, domain, server, status, actual_serial)
|
||||
|
||||
# Return some values for testing purposes.
|
||||
return (status, actual_serial, retries)
|
||||
|
||||
def _make_and_send_dns_message(self, domain, destination, timeout,
|
||||
def _make_and_send_dns_message(self, domain, server, timeout,
|
||||
retry_interval, max_retries, notify=False):
|
||||
"""
|
||||
:param domain: The designate domain object. This contains the domain
|
||||
name.
|
||||
:param destination: The destination for the dns message. This is of the
|
||||
format "ip:[port]". If there is no port, port 53 is used.
|
||||
:param server: The destination for the dns message is
|
||||
server.host:server.port.
|
||||
:param timeout: The time (in seconds) to wait for a response from
|
||||
destination.
|
||||
:param retry_interval: The time (in seconds) between retries.
|
||||
|
@ -151,19 +147,8 @@ class NotifyEndpoint(object):
|
|||
response is the response on success or None on failure.
|
||||
current_retry is the current retry number
|
||||
"""
|
||||
dest = destination.split(':')
|
||||
if len(dest) > 2:
|
||||
# Throw an exception
|
||||
raise exceptions.ConfigurationError(
|
||||
"'destination' not in the correct format. Expected format "
|
||||
"'ipaddress:port'. Got %(dst)s" % {'dst': destination})
|
||||
elif len(dest) == 1:
|
||||
dest_ip = dest[0]
|
||||
# No port - use default port of 53
|
||||
dest_port = 53
|
||||
else:
|
||||
dest_ip = dest[0]
|
||||
dest_port = int(dest[1])
|
||||
dest_ip = server.host
|
||||
dest_port = server.port
|
||||
|
||||
dns_message = self._make_dns_message(domain.name, notify=notify)
|
||||
|
||||
|
|
|
@ -57,31 +57,29 @@ class MdnsAPI(object):
|
|||
MDNS_API = cls()
|
||||
return MDNS_API
|
||||
|
||||
def notify_zone_changed(self, context, domain, destination, timeout,
|
||||
def notify_zone_changed(self, context, domain, server, timeout,
|
||||
retry_interval, max_retries, delay):
|
||||
LOG.info(_LI("notify_zone_changed: Calling mdns for zone '%(zone)s', "
|
||||
"serial '%(serial)s' to server '%(dst)s'") %
|
||||
"serial '%(serial)s' to server '%(host)s:%(port)s'") %
|
||||
{'zone': domain.name, 'serial': domain.serial,
|
||||
'dst': destination})
|
||||
'host': server.host, 'port': server.port})
|
||||
# The notify_zone_changed method is a cast rather than a call since the
|
||||
# caller need not wait for the notify to complete.
|
||||
return self.notify_client.cast(
|
||||
context, 'notify_zone_changed', domain=domain,
|
||||
destination=destination, timeout=timeout,
|
||||
retry_interval=retry_interval, max_retries=max_retries,
|
||||
delay=delay)
|
||||
server=server, timeout=timeout, retry_interval=retry_interval,
|
||||
max_retries=max_retries, delay=delay)
|
||||
|
||||
def poll_for_serial_number(self, context, domain, destination, timeout,
|
||||
def poll_for_serial_number(self, context, domain, server, timeout,
|
||||
retry_interval, max_retries, delay):
|
||||
LOG.info(_LI("poll_for_serial_number: Calling mdns for zone '%(zone)s'"
|
||||
", serial '%(serial)s' to server '%(dst)s'") %
|
||||
", serial '%(serial)s' to server '%(host)s:%(port)s'") %
|
||||
{'zone': domain.name, 'serial': domain.serial,
|
||||
'dst': destination})
|
||||
'host': server.host, 'port': server.port})
|
||||
# The poll_for_serial_number method is a cast rather than a call since
|
||||
# the caller need not wait for the poll to complete. Mdns informs pool
|
||||
# manager of the return value using update_status
|
||||
return self.notify_client.cast(
|
||||
context, 'poll_for_serial_number', domain=domain,
|
||||
destination=destination, timeout=timeout,
|
||||
retry_interval=retry_interval, max_retries=max_retries,
|
||||
delay=delay)
|
||||
server=server, timeout=timeout, retry_interval=retry_interval,
|
||||
max_retries=max_retries, delay=delay)
|
||||
|
|
|
@ -76,10 +76,10 @@ class PoolManagerAPI(object):
|
|||
return self.client.cast(
|
||||
context, 'update_domain', domain=domain)
|
||||
|
||||
def update_status(self, context, domain, destination,
|
||||
def update_status(self, context, domain, server,
|
||||
status, actual_serial):
|
||||
LOG.info(_LI("update_status: Calling pool manager's update_status "
|
||||
"for %(domain)s") % {'domain': domain.name})
|
||||
return self.client.cast(
|
||||
context, 'update_status', domain=domain, destination=destination,
|
||||
context, 'update_status', domain=domain, server=server,
|
||||
status=status, actual_serial=actual_serial)
|
||||
|
|
|
@ -223,12 +223,12 @@ class Service(service.RPCService):
|
|||
for server_backend in self.server_backends:
|
||||
self._update_domain_on_server(context, domain, server_backend)
|
||||
|
||||
def update_status(self, context, domain, destination,
|
||||
def update_status(self, context, domain, server,
|
||||
status, actual_serial):
|
||||
"""
|
||||
:param context: Security context information.
|
||||
:param domain: The designate domain object.
|
||||
:param destination: The server in the format "ip:port".
|
||||
:param server: The server for which a status update is being sent.
|
||||
:param status: The status, 'SUCCESS' or 'ERROR'.
|
||||
:param actual_serial: The actual serial number received from the name
|
||||
server for the domain.
|
||||
|
@ -236,7 +236,6 @@ class Service(service.RPCService):
|
|||
"""
|
||||
LOG.debug("Calling update_status for %s" % domain.name)
|
||||
|
||||
server = self._get_server(destination)
|
||||
update_status = self._retrieve_from_cache(
|
||||
context, server, domain, UPDATE_ACTION)
|
||||
cache_serial = update_status.serial_number
|
||||
|
@ -441,13 +440,13 @@ class Service(service.RPCService):
|
|||
|
||||
def _notify_zone_changed(self, context, domain, server):
|
||||
self.mdns_api.notify_zone_changed(
|
||||
context, domain, self._get_destination(server),
|
||||
self.timeout, self.retry_interval, self.max_retries, 0)
|
||||
context, domain, server, self.timeout, self.retry_interval,
|
||||
self.max_retries, 0)
|
||||
|
||||
def _poll_for_serial_number(self, context, domain, server):
|
||||
self.mdns_api.poll_for_serial_number(
|
||||
context, domain, self._get_destination(server), self.timeout,
|
||||
self.retry_interval, self.max_retries, self.delay)
|
||||
context, domain, server, self.timeout, self.retry_interval,
|
||||
self.max_retries, self.delay)
|
||||
|
||||
def _get_server_backend(self, server_id):
|
||||
for server_backend in self.server_backends:
|
||||
|
@ -459,13 +458,6 @@ class Service(service.RPCService):
|
|||
def _get_destination(server):
|
||||
return '%s:%s' % (server.host, server.port)
|
||||
|
||||
def _get_server(self, destination):
|
||||
parts = destination.split(':')
|
||||
for server_backend in self.server_backends:
|
||||
server = server_backend['server']
|
||||
if server.host == parts[0] and server.port == int(parts[1]):
|
||||
return server
|
||||
|
||||
@staticmethod
|
||||
def _percentage(count, total_count):
|
||||
return (Decimal(count) / Decimal(total_count)) * Decimal(100)
|
||||
|
|
|
@ -33,6 +33,13 @@ class MdnsNotifyTest(MdnsTestCase):
|
|||
def setUp(self):
|
||||
super(MdnsNotifyTest, self).setUp()
|
||||
self.notify = notify.NotifyEndpoint()
|
||||
server_values = {
|
||||
'id': 'f278782a-07dc-4502-9177-b5d85c5f7c7e',
|
||||
'host': '127.0.0.1',
|
||||
'port': 65255,
|
||||
'backend': 'fake'
|
||||
}
|
||||
self.server = objects.PoolServer(**server_values)
|
||||
|
||||
def test_send_notify_message(self):
|
||||
# id 10001
|
||||
|
@ -50,7 +57,7 @@ class MdnsNotifyTest(MdnsTestCase):
|
|||
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
|
||||
binascii.a2b_hex(expected_notify_response))):
|
||||
response, retry = self.notify.notify_zone_changed(
|
||||
context, objects.Domain(**self.test_domain), '127.0.0.1:65255',
|
||||
context, objects.Domain(**self.test_domain), self.server,
|
||||
0, 0, 2, 0)
|
||||
self.assertEqual(response, dns.message.from_wire(
|
||||
binascii.a2b_hex(expected_notify_response)))
|
||||
|
@ -72,7 +79,7 @@ class MdnsNotifyTest(MdnsTestCase):
|
|||
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
|
||||
binascii.a2b_hex(non_auth_notify_response))):
|
||||
response, retry = self.notify.notify_zone_changed(
|
||||
context, objects.Domain(**self.test_domain), '127.0.0.1:65255',
|
||||
context, objects.Domain(**self.test_domain), self.server,
|
||||
0, 0, 2, 0)
|
||||
self.assertEqual(response, None)
|
||||
self.assertEqual(retry, 1)
|
||||
|
@ -81,7 +88,7 @@ class MdnsNotifyTest(MdnsTestCase):
|
|||
def test_send_notify_message_timeout(self, _):
|
||||
context = self.get_context()
|
||||
response, retry = self.notify.notify_zone_changed(
|
||||
context, objects.Domain(**self.test_domain), '127.0.0.1:65255', 0,
|
||||
context, objects.Domain(**self.test_domain), self.server, 0,
|
||||
0, 2, 0)
|
||||
self.assertEqual(response, None)
|
||||
self.assertEqual(retry, 2)
|
||||
|
@ -90,7 +97,7 @@ class MdnsNotifyTest(MdnsTestCase):
|
|||
def test_send_notify_message_bad_response(self, _):
|
||||
context = self.get_context()
|
||||
response, retry = self.notify.notify_zone_changed(
|
||||
context, objects.Domain(**self.test_domain), '127.0.0.1:65255', 0,
|
||||
context, objects.Domain(**self.test_domain), self.server, 0,
|
||||
0, 2, 0)
|
||||
self.assertEqual(response, None)
|
||||
self.assertEqual(retry, 1)
|
||||
|
@ -115,7 +122,7 @@ class MdnsNotifyTest(MdnsTestCase):
|
|||
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
|
||||
binascii.a2b_hex(poll_response))):
|
||||
status, serial, retries = self.notify.poll_for_serial_number(
|
||||
context, objects.Domain(**self.test_domain), '127.0.0.1:65255',
|
||||
context, objects.Domain(**self.test_domain), self.server,
|
||||
0, 0, 2, 0)
|
||||
self.assertEqual(status, 'SUCCESS')
|
||||
self.assertEqual(serial, self.test_domain['serial'])
|
||||
|
@ -141,7 +148,7 @@ class MdnsNotifyTest(MdnsTestCase):
|
|||
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
|
||||
binascii.a2b_hex(poll_response))):
|
||||
status, serial, retries = self.notify.poll_for_serial_number(
|
||||
context, objects.Domain(**self.test_domain), '127.0.0.1:65255',
|
||||
context, objects.Domain(**self.test_domain), self.server,
|
||||
0, 0, 2, 0)
|
||||
self.assertEqual(status, 'ERROR')
|
||||
self.assertEqual(serial, 99)
|
||||
|
@ -167,7 +174,7 @@ class MdnsNotifyTest(MdnsTestCase):
|
|||
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
|
||||
binascii.a2b_hex(poll_response))):
|
||||
status, serial, retries = self.notify.poll_for_serial_number(
|
||||
context, objects.Domain(**self.test_domain), '127.0.0.1:65255',
|
||||
context, objects.Domain(**self.test_domain), self.server,
|
||||
0, 0, 2, 0)
|
||||
self.assertEqual(status, 'SUCCESS')
|
||||
self.assertEqual(serial, 101)
|
||||
|
@ -177,7 +184,7 @@ class MdnsNotifyTest(MdnsTestCase):
|
|||
def test_poll_for_serial_number_timeout(self, _):
|
||||
context = self.get_context()
|
||||
status, serial, retries = self.notify.poll_for_serial_number(
|
||||
context, objects.Domain(**self.test_domain), '127.0.0.1:65255',
|
||||
context, objects.Domain(**self.test_domain), self.server,
|
||||
0, 0, 2, 0)
|
||||
self.assertEqual(status, 'ERROR')
|
||||
self.assertEqual(serial, None)
|
||||
|
|
|
@ -106,17 +106,19 @@ class PoolManagerServiceTest(PoolManagerTestCase):
|
|||
# Ensure notify_zone_changed and poll_for_serial_number
|
||||
# was called for each backend server.
|
||||
self.assertEqual(2, mock_notify_zone_changed.call_count)
|
||||
self.assertEqual([call(self.admin_context, domain,
|
||||
'10.0.0.2:53', 30, 2, 3, 0),
|
||||
call(self.admin_context, domain,
|
||||
'10.0.0.3:53', 30, 2, 3, 0)],
|
||||
mock_notify_zone_changed.call_args_list)
|
||||
self.assertEqual(
|
||||
[call(self.admin_context, domain,
|
||||
self.service.server_backends[0]['server'], 30, 2, 3, 0),
|
||||
call(self.admin_context, domain,
|
||||
self.service.server_backends[1]['server'], 30, 2, 3, 0)],
|
||||
mock_notify_zone_changed.call_args_list)
|
||||
self.assertEqual(2, mock_poll_for_serial_number.call_count)
|
||||
self.assertEqual([call(self.admin_context, domain,
|
||||
'10.0.0.2:53', 30, 2, 3, 1),
|
||||
call(self.admin_context, domain,
|
||||
'10.0.0.3:53', 30, 2, 3, 1)],
|
||||
mock_poll_for_serial_number.call_args_list)
|
||||
self.assertEqual(
|
||||
[call(self.admin_context, domain,
|
||||
self.service.server_backends[0]['server'], 30, 2, 3, 1),
|
||||
call(self.admin_context, domain,
|
||||
self.service.server_backends[1]['server'], 30, 2, 3, 1)],
|
||||
mock_poll_for_serial_number.call_args_list)
|
||||
|
||||
@patch.object(impl_fake.FakeBackend, 'create_domain')
|
||||
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
|
||||
|
@ -176,9 +178,11 @@ class PoolManagerServiceTest(PoolManagerTestCase):
|
|||
self.assertEqual(0, update_statuses[0].serial_number)
|
||||
|
||||
mock_notify_zone_changed.assert_called_once_with(
|
||||
self.admin_context, domain, '10.0.0.2:53', 30, 2, 3, 0)
|
||||
self.admin_context, domain,
|
||||
self.service.server_backends[0]['server'], 30, 2, 3, 0)
|
||||
mock_poll_for_serial_number.assert_called_once_with(
|
||||
self.admin_context, domain, '10.0.0.2:53', 30, 2, 3, 1)
|
||||
self.admin_context, domain,
|
||||
self.service.server_backends[0]['server'], 30, 2, 3, 1)
|
||||
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
def test_delete_domain(self, mock_update_status):
|
||||
|
@ -280,17 +284,19 @@ class PoolManagerServiceTest(PoolManagerTestCase):
|
|||
# Ensure notify_zone_changed and poll_for_serial_number
|
||||
# was called for each backend server.
|
||||
self.assertEqual(2, mock_notify_zone_changed.call_count)
|
||||
self.assertEqual([call(self.admin_context, domain,
|
||||
'10.0.0.2:53', 30, 2, 3, 0),
|
||||
call(self.admin_context, domain,
|
||||
'10.0.0.3:53', 30, 2, 3, 0)],
|
||||
mock_notify_zone_changed.call_args_list)
|
||||
self.assertEqual(
|
||||
[call(self.admin_context, domain,
|
||||
self.service.server_backends[0]['server'], 30, 2, 3, 0),
|
||||
call(self.admin_context, domain,
|
||||
self.service.server_backends[1]['server'], 30, 2, 3, 0)],
|
||||
mock_notify_zone_changed.call_args_list)
|
||||
self.assertEqual(2, mock_poll_for_serial_number.call_count)
|
||||
self.assertEqual([call(self.admin_context, domain,
|
||||
'10.0.0.2:53', 30, 2, 3, 1),
|
||||
call(self.admin_context, domain,
|
||||
'10.0.0.3:53', 30, 2, 3, 1)],
|
||||
mock_poll_for_serial_number.call_args_list)
|
||||
self.assertEqual(
|
||||
[call(self.admin_context, domain,
|
||||
self.service.server_backends[0]['server'], 30, 2, 3, 1),
|
||||
call(self.admin_context, domain,
|
||||
self.service.server_backends[1]['server'], 30, 2, 3, 1)],
|
||||
mock_poll_for_serial_number.call_args_list)
|
||||
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
def test_update_status(self, mock_update_status):
|
||||
|
@ -313,7 +319,8 @@ class PoolManagerServiceTest(PoolManagerTestCase):
|
|||
self.assertEqual(0, update_statuses[1].serial_number)
|
||||
|
||||
self.service.update_status(self.admin_context, domain,
|
||||
'10.0.0.2:53', 'SUCCESS', domain.serial)
|
||||
self.service.server_backends[0]['server'],
|
||||
'SUCCESS', domain.serial)
|
||||
|
||||
update_statuses = self._find_pool_manager_statuses(
|
||||
self.admin_context, 'UPDATE', domain)
|
||||
|
@ -327,7 +334,8 @@ class PoolManagerServiceTest(PoolManagerTestCase):
|
|||
self.assertEqual(False, mock_update_status.called)
|
||||
|
||||
self.service.update_status(self.admin_context, domain,
|
||||
'10.0.0.3:53', 'SUCCESS', domain.serial)
|
||||
self.service.server_backends[1]['server'],
|
||||
'SUCCESS', domain.serial)
|
||||
|
||||
update_statuses = self._find_pool_manager_statuses(
|
||||
self.admin_context, 'UPDATE', domain)
|
||||
|
@ -344,8 +352,10 @@ class PoolManagerServiceTest(PoolManagerTestCase):
|
|||
domain = self.create_domain(name='example.org.')
|
||||
|
||||
with testtools.ExpectedException(exceptions.PoolManagerStatusNotFound):
|
||||
self.service.update_status(self.admin_context, domain,
|
||||
'10.0.0.2:53', 'SUCCESS', domain.serial)
|
||||
self.service.update_status(
|
||||
self.admin_context, domain,
|
||||
self.service.server_backends[0]['server'], 'SUCCESS',
|
||||
domain.serial)
|
||||
|
||||
def _find_pool_manager_statuses(self, context, action,
|
||||
domain=None, status=None):
|
||||
|
|
Loading…
Reference in New Issue