From ac53cd4df00acde0615f25fd0a990d2eb994a213 Mon Sep 17 00:00:00 2001 From: Erik Olof Gunnar Andersson Date: Sun, 22 Sep 2019 00:31:33 -0700 Subject: [PATCH] Simplified network api implementation Reduced the complexity of the neutron code for looking up ptr records. * Limit max concurrency to 5. * Better error handling. Change-Id: I0c50aaadaabb6e0f054121d8270fcb8e05b6e762 --- designate/network_api/neutron.py | 88 ++++++++++++++++---------------- 1 file changed, 43 insertions(+), 45 deletions(-) diff --git a/designate/network_api/neutron.py b/designate/network_api/neutron.py index 359ea196e..91e5bc962 100644 --- a/designate/network_api/neutron.py +++ b/designate/network_api/neutron.py @@ -14,16 +14,15 @@ # under the License. # # Copied partially from nova -from neutronclient.v2_0 import client as clientv20 +import concurrent.futures from neutronclient.common import exceptions as neutron_exceptions +from neutronclient.v2_0 import client as clientv20 from oslo_config import cfg from oslo_log import log as logging -from oslo_service import threadgroup from designate import exceptions from designate.network_api import base - CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -63,52 +62,51 @@ class NeutronNetworkAPI(base.NetworkAPI): service_type='network', endpoint_type=CONF['network_api:neutron'].endpoint_type, config_section='network_api:neutron', - region=region) + region=region + ) - tg = threadgroup.ThreadGroup() + floating_ips = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + executors = [ + executor.submit( + self._get_floating_ips, + context, + endpoint, + region, + project_id=context.project_id + ) for endpoint, region in endpoints + ] + for future in concurrent.futures.as_completed(executors): + try: + floating_ips.extend(future.result()) + except Exception as e: + raise exceptions.NeutronCommunicationFailure(e) - failed = [] - data = [] - - def _call(endpoint, region, *args, **kw): - client = get_client(context, endpoint=endpoint) - LOG.debug("Attempting to fetch FloatingIPs from %s @ %s", - endpoint, region) - try: - fips = client.list_floatingips(*args, **kw) - except neutron_exceptions.Unauthorized as e: - # NOTE: 401 might be that the user doesn't have neutron - # activated in a particular region, we'll just log the failure - # and go on with our lives. - LOG.warning("Calling Neutron resulted in a 401, " - "please investigate.") - LOG.exception(e) - return - except Exception as e: - LOG.error('Failed calling Neutron %(region)s - %(endpoint)s', - {'region': region, 'endpoint': endpoint}) - LOG.exception(e) - failed.append((e, endpoint, region)) - return + return floating_ips + @staticmethod + def _get_floating_ips(context, endpoint, region, project_id): + LOG.debug('Fetching floating ips from %(region)s @ %(endpoint)s', + {'region': region, 'endpoint': endpoint}) + client = get_client(context, endpoint=endpoint) + try: + fips = client.list_floatingips(project_id=project_id) for fip in fips['floatingips']: - data.append({ + yield { 'id': fip['id'], 'address': fip['floating_ip_address'], 'region': region - }) - - LOG.debug("Added %i FloatingIPs from %s @ %s", - len(data), endpoint, region) - - for endpoint, region in endpoints: - tg.add_thread(_call, endpoint, region, - project_id=context.project_id) - tg.wait() - - # NOTE: Sadly tg code doesn't give us a good way to handle failures. - if failed: - msg = 'Failed retrieving FloatingIPs from Neutron in %s' % \ - ", ".join(['%s - %s' % (i[1], i[2]) for i in failed]) - raise exceptions.NeutronCommunicationFailure(msg) - return data + } + except neutron_exceptions.Unauthorized: + LOG.warning( + 'Failed fetching floating ips from %(region)s @ %(endpoint)s' + 'due to an Unauthorized error', + {'region': region, 'endpoint': endpoint} + ) + except Exception: + LOG.error( + 'Failed fetching floating ips from %(region)s @ %(endpoint)s', + {'region': region, 'endpoint': endpoint} + ) + raise + return