From 7fa5789e517fd9ea49e03a2f80013f1409a10636 Mon Sep 17 00:00:00 2001 From: Grzegorz Grasza Date: Thu, 22 Nov 2018 09:57:35 +0100 Subject: [PATCH] Refactor notifications Make a registry of different types of processed events instead of an endless if/elif clause. Change-Id: I34ebdca82810b9abd46a84aca7f1a8febf718be6 --- novajoin/notifications.py | 253 +++++++++++++++++++++----------------- 1 file changed, 143 insertions(+), 110 deletions(-) diff --git a/novajoin/notifications.py b/novajoin/notifications.py index 473dc37..f99aca9 100644 --- a/novajoin/notifications.py +++ b/novajoin/notifications.py @@ -43,6 +43,10 @@ LOG = logging.getLogger(__name__) BACKOFF = 2 +def ipaclient(): + return IPAClient(backoff=BACKOFF) + + def novaclient(): session = get_session() return nova_client.Client('2.1', session=session) @@ -53,6 +57,14 @@ def neutronclient(): return neutron_client.Client(session=session) +class Registry(dict): + def __call__(self, name): + def decorator(fun): + self[name] = fun + return fun + return decorator + + class NotificationEndpoint(object): filter_rule = oslo_messaging.notify.filter.NotificationFilter( @@ -63,15 +75,7 @@ class NotificationEndpoint(object): '^network.floating_ip.(dis)?associate|' '^floatingip.update.end') - def _generate_hostname(self, hostname): - # FIXME: Don't re-calculate the hostname, fetch it from somewhere - project = 'foo' - domain = get_domain() - if CONF.project_subdomain: - host = '%s.%s.%s' % (hostname, project, domain) - else: - host = '%s.%s' % (hostname, domain) - return host + event_handlers = Registry() def info(self, ctxt, publisher_id, event_type, payload, metadata): LOG.debug('notification:') @@ -80,105 +84,124 @@ class NotificationEndpoint(object): LOG.debug("publisher: %s, event: %s, metadata: %s", publisher_id, event_type, metadata) - ipaclient = IPAClient(backoff=BACKOFF) - if event_type == 'compute.instance.create.end': - hostname = self._generate_hostname(payload.get('hostname')) - instance_id = payload.get('instance_id') - LOG.info("Add new host %s (%s)", instance_id, hostname) - elif event_type == 'compute.instance.update': - join_controller = join.JoinController(ipaclient) - hostname_short = payload.get('hostname') - instance_id = payload.get('instance_id') - payload_metadata = payload.get('metadata') - image_metadata = payload.get('image_meta') + event_handler = self.event_handlers.get( + event_type, lambda payload: LOG.error("Status update or unknown")) + # run event handler for received notification type + event_handler(self, payload) - hostname = self._generate_hostname(hostname_short) + @event_handlers('compute.instance.create.end') + def instance_create(self, payload): + hostname = self._generate_hostname(payload.get('hostname')) + instance_id = payload.get('instance_id') + LOG.info("Add new host %s (%s)", instance_id, hostname) - enroll = payload_metadata.get('ipa_enroll', '') - image_enroll = image_metadata.get('ipa_enroll', '') - if enroll.lower() != 'true' and image_enroll.lower() != 'true': - LOG.info('IPA enrollment not requested, skipping update of %s', - hostname) - return - # Ensure this instance exists in nova - instance = get_instance(instance_id) - if instance is None: - msg = 'No such instance-id, %s' % instance_id - LOG.error(msg) - return + @event_handlers('compute.instance.update') + def instance_update(self, payload): + ipa = ipaclient() + join_controller = join.JoinController(ipa) + hostname_short = payload.get('hostname') + instance_id = payload.get('instance_id') + payload_metadata = payload.get('metadata') + image_metadata = payload.get('image_meta') - ipaclient.start_batch_operation() - # key-per-service - managed_services = [ - payload_metadata[key] for key in payload_metadata.keys() - if key.startswith('managed_service_')] - if managed_services: - join_controller.handle_services(hostname, managed_services) - # compact json format - if 'compact_services' in payload_metadata: - join_controller.handle_compact_services( - hostname_short, payload_metadata.get('compact_services')) - ipaclient.flush_batch_operation() - elif event_type == 'compute.instance.delete.end': - hostname_short = payload.get('hostname') - instance_id = payload.get('instance_id') - payload_metadata = payload.get('metadata') - image_metadata = payload.get('image_meta') + hostname = self._generate_hostname(hostname_short) - hostname = self._generate_hostname(hostname_short) + enroll = payload_metadata.get('ipa_enroll', '') + image_enroll = image_metadata.get('ipa_enroll', '') + if enroll.lower() != 'true' and image_enroll.lower() != 'true': + LOG.info('IPA enrollment not requested, skipping update of %s', + hostname) + return + # Ensure this instance exists in nova + instance = get_instance(instance_id) + if instance is None: + msg = 'No such instance-id, %s' % instance_id + LOG.error(msg) + return - enroll = payload_metadata.get('ipa_enroll', '') - image_enroll = image_metadata.get('ipa_enroll', '') + ipa.start_batch_operation() + # key-per-service + managed_services = [ + payload_metadata[key] for key in payload_metadata.keys() + if key.startswith('managed_service_')] + if managed_services: + join_controller.handle_services(hostname, managed_services) + # compact json format + if 'compact_services' in payload_metadata: + join_controller.handle_compact_services( + hostname_short, payload_metadata.get('compact_services')) + ipa.flush_batch_operation() - if enroll.lower() != 'true' and image_enroll.lower() != 'true': - LOG.info('IPA enrollment not requested, skipping delete of %s', - hostname) - return + @event_handlers('compute.instance.delete.end') + def instance_delete(self, payload): + hostname_short = payload.get('hostname') + instance_id = payload.get('instance_id') + payload_metadata = payload.get('metadata') + image_metadata = payload.get('image_meta') - LOG.info("Delete host %s (%s)", instance_id, hostname) - ipaclient.delete_host(hostname, {}) - self.delete_subhosts(ipaclient, hostname_short, payload_metadata) - elif event_type == 'network.floating_ip.associate': - floating_ip = payload.get('floating_ip') - LOG.info("Associate floating IP %s" % floating_ip) - nova = novaclient() - server = nova.servers.get(payload.get('instance_id')) - if server: - ipaclient.add_ip(server.get, floating_ip) - else: - LOG.error("Could not resolve %s into a hostname", - payload.get('instance_id')) - elif event_type == 'network.floating_ip.disassociate': - floating_ip = payload.get('floating_ip') - LOG.info("Disassociate floating IP %s" % floating_ip) - nova = novaclient() - server = nova.servers.get(payload.get('instance_id')) - if server: - ipaclient.remove_ip(server.name, floating_ip) - else: - LOG.error("Could not resolve %s into a hostname", - payload.get('instance_id')) - elif event_type == 'floatingip.update.end': # Neutron - floatingip = payload.get('floatingip') - floating_ip = floatingip.get('floating_ip_address') - port_id = floatingip.get('port_id') - LOG.info("Neutron floating IP associate: %s" % floating_ip) - nova = novaclient() - neutron = neutronclient() - search_opts = {'id': port_id} - ports = neutron.list_ports(**search_opts).get('ports') - if len(ports) == 1: - device_id = ports[0].get('device_id') - if device_id: - server = nova.servers.get(device_id) - if server: - ipaclient.add_ip(server.name, floating_ip) - else: - LOG.error("Expected 1 port, got %d", len(ports)) + hostname = self._generate_hostname(hostname_short) + + enroll = payload_metadata.get('ipa_enroll', '') + image_enroll = image_metadata.get('ipa_enroll', '') + + if enroll.lower() != 'true' and image_enroll.lower() != 'true': + LOG.info('IPA enrollment not requested, skipping delete of %s', + hostname) + return + + LOG.info("Delete host %s (%s)", instance_id, hostname) + ipa = ipaclient() + ipa.delete_host(hostname, {}) + self.delete_subhosts(ipa, hostname_short, payload_metadata) + + @event_handlers('network.floating_ip.associate') + def floaitng_ip_associate(self, payload): + floating_ip = payload.get('floating_ip') + LOG.info("Associate floating IP %s" % floating_ip) + ipa = ipaclient() + nova = novaclient() + server = nova.servers.get(payload.get('instance_id')) + if server: + ipa.add_ip(server.get, floating_ip) else: - LOG.error("Status update or unknown") + LOG.error("Could not resolve %s into a hostname", + payload.get('instance_id')) - def delete_subhosts(self, ipaclient, hostname_short, metadata): + @event_handlers('network.floating_ip.disassociate') + def floating_ip_disassociate(self, payload): + floating_ip = payload.get('floating_ip') + LOG.info("Disassociate floating IP %s" % floating_ip) + ipa = ipaclient() + nova = novaclient() + server = nova.servers.get(payload.get('instance_id')) + if server: + ipa.remove_ip(server.name, floating_ip) + else: + LOG.error("Could not resolve %s into a hostname", + payload.get('instance_id')) + + @event_handlers('floatingip.update.end') + def floating_ip_update(self, payload): + """Neutron event""" + floatingip = payload.get('floatingip') + floating_ip = floatingip.get('floating_ip_address') + port_id = floatingip.get('port_id') + LOG.info("Neutron floating IP associate: %s" % floating_ip) + ipa = ipaclient() + nova = novaclient() + neutron = neutronclient() + search_opts = {'id': port_id} + ports = neutron.list_ports(**search_opts).get('ports') + if len(ports) == 1: + device_id = ports[0].get('device_id') + if device_id: + server = nova.servers.get(device_id) + if server: + ipa.add_ip(server.name, floating_ip) + else: + LOG.error("Expected 1 port, got %d", len(ports)) + + def delete_subhosts(self, ipa, hostname_short, metadata): """Delete subhosts and remove VIPs if possible. Servers can have multiple network interfaces, and therefore can @@ -199,14 +222,14 @@ class NotificationEndpoint(object): return if 'compact_services' in metadata: - self.handle_compact_services(ipaclient, hostname_short, + self.handle_compact_services(ipa, hostname_short, metadata.get('compact_services')) managed_services = [metadata[key] for key in metadata.keys() if key.startswith('managed_service_')] if managed_services: - self.handle_managed_services(ipaclient, managed_services) + self.handle_managed_services(ipa, managed_services) - def handle_compact_services(self, ipaclient, host_short, + def handle_compact_services(self, ipa, host_short, service_repr_json): """Reconstructs and removes subhosts for compact services. @@ -225,7 +248,7 @@ class NotificationEndpoint(object): service_repr = json.loads(service_repr_json) hosts_found = list() - ipaclient.start_batch_operation() + ipa.start_batch_operation() for service_name, net_list in service_repr.items(): for network in net_list: host = "%s.%s" % (host_short, network) @@ -233,11 +256,11 @@ class NotificationEndpoint(object): # remove host if principal_host not in hosts_found: - ipaclient.delete_subhost(principal_host) + ipa.delete_subhost(principal_host) hosts_found.append(principal_host) - ipaclient.flush_batch_operation() + ipa.flush_batch_operation() - def handle_managed_services(self, ipaclient, services): + def handle_managed_services(self, ipa, services): """Delete any managed services if possible. Checks to see if the managed service subhost has no managed hosts @@ -250,19 +273,29 @@ class NotificationEndpoint(object): for principal in services: if principal not in services_deleted: try: - if ipaclient.service_has_hosts(principal): + if ipa.service_has_hosts(principal): continue except KeyError: continue - ipaclient.delete_service(principal, batch=False) + ipa.delete_service(principal, batch=False) services_deleted.append(principal) principal_host = principal.split('/', 1)[1] if principal_host not in hosts_deleted: - if not ipaclient.host_has_services(principal_host): - ipaclient.delete_subhost(principal_host, batch=False) + if not ipa.host_has_services(principal_host): + ipa.delete_subhost(principal_host, batch=False) hosts_deleted.append(principal_host) + def _generate_hostname(self, hostname): + # FIXME: Don't re-calculate the hostname, fetch it from somewhere + project = 'foo' + domain = get_domain() + if CONF.project_subdomain: + host = '%s.%s.%s' % (hostname, project, domain) + else: + host = '%s.%s' % (hostname, domain) + return host + def main(): register_keystoneauth_opts(CONF)