diff --git a/designate/network_api/neutron.py b/designate/network_api/neutron.py index 359ea196e..91e5bc962 100644 --- a/designate/network_api/neutron.py +++ b/designate/network_api/neutron.py @@ -14,16 +14,15 @@ # under the License. # # Copied partially from nova -from neutronclient.v2_0 import client as clientv20 +import concurrent.futures from neutronclient.common import exceptions as neutron_exceptions +from neutronclient.v2_0 import client as clientv20 from oslo_config import cfg from oslo_log import log as logging -from oslo_service import threadgroup from designate import exceptions from designate.network_api import base - CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -63,52 +62,51 @@ class NeutronNetworkAPI(base.NetworkAPI): service_type='network', endpoint_type=CONF['network_api:neutron'].endpoint_type, config_section='network_api:neutron', - region=region) + region=region + ) - tg = threadgroup.ThreadGroup() + floating_ips = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + executors = [ + executor.submit( + self._get_floating_ips, + context, + endpoint, + region, + project_id=context.project_id + ) for endpoint, region in endpoints + ] + for future in concurrent.futures.as_completed(executors): + try: + floating_ips.extend(future.result()) + except Exception as e: + raise exceptions.NeutronCommunicationFailure(e) - failed = [] - data = [] - - def _call(endpoint, region, *args, **kw): - client = get_client(context, endpoint=endpoint) - LOG.debug("Attempting to fetch FloatingIPs from %s @ %s", - endpoint, region) - try: - fips = client.list_floatingips(*args, **kw) - except neutron_exceptions.Unauthorized as e: - # NOTE: 401 might be that the user doesn't have neutron - # activated in a particular region, we'll just log the failure - # and go on with our lives. - LOG.warning("Calling Neutron resulted in a 401, " - "please investigate.") - LOG.exception(e) - return - except Exception as e: - LOG.error('Failed calling Neutron %(region)s - %(endpoint)s', - {'region': region, 'endpoint': endpoint}) - LOG.exception(e) - failed.append((e, endpoint, region)) - return + return floating_ips + @staticmethod + def _get_floating_ips(context, endpoint, region, project_id): + LOG.debug('Fetching floating ips from %(region)s @ %(endpoint)s', + {'region': region, 'endpoint': endpoint}) + client = get_client(context, endpoint=endpoint) + try: + fips = client.list_floatingips(project_id=project_id) for fip in fips['floatingips']: - data.append({ + yield { 'id': fip['id'], 'address': fip['floating_ip_address'], 'region': region - }) - - LOG.debug("Added %i FloatingIPs from %s @ %s", - len(data), endpoint, region) - - for endpoint, region in endpoints: - tg.add_thread(_call, endpoint, region, - project_id=context.project_id) - tg.wait() - - # NOTE: Sadly tg code doesn't give us a good way to handle failures. - if failed: - msg = 'Failed retrieving FloatingIPs from Neutron in %s' % \ - ", ".join(['%s - %s' % (i[1], i[2]) for i in failed]) - raise exceptions.NeutronCommunicationFailure(msg) - return data + } + except neutron_exceptions.Unauthorized: + LOG.warning( + 'Failed fetching floating ips from %(region)s @ %(endpoint)s' + 'due to an Unauthorized error', + {'region': region, 'endpoint': endpoint} + ) + except Exception: + LOG.error( + 'Failed fetching floating ips from %(region)s @ %(endpoint)s', + {'region': region, 'endpoint': endpoint} + ) + raise + return