# Copyright 2013, 2014 IBM Corp. # from cinderclient.v1 import client import cinder.db.sqlalchemy.models import sys import time import logging from oslo.config import cfg from cinder import db from cinder import context from cinder import service as taskservice from cinder.openstack.common import service from cinder import quota from oslo_log import log from powervc.common import config from powervc.common.gettextutils import _ from powervc.volume.manager import constants from powervc.volume.driver import service as pvcservice from powervc.common import utils from powervc.common.client import delegate as ctx_delegate from powervc.common import messaging CONF = config.CONF LOG = log.getLogger(__name__) QUOTAS = quota.QUOTAS volume_sync_opts = [ cfg.IntOpt('volume_sync_interval', default=20, help=_('Volume periodic sync interval specified in ' 'seconds.')), cfg.IntOpt('full_volume_sync_frequency', default=30, help=_('How many volume sync intervals between full volume ' 'syncs. Only volumes known to be out of sync are ' 'synced on the interval except after this many ' 'intervals when all volumes are synced.')), cfg.IntOpt('volume_type_sync_interval', default=20, help=_('Volume type periodic sync interval specified in ' 'seconds.')), cfg.IntOpt('full_volume_type_sync_frequency', default=30, help=_('How many volume type sync intervals between full volume' ' type syncs. Only volumes known to be out of sync are ' 'synced on the interval except after this many ' 'intervals when all volumes are synced.')), ] CONF.register_opts(volume_sync_opts, group='powervc') class PowerVCCinderManager(service.Service): """ Manages the synchronization of volume types and volumes TODO """ def __init__(self): ''' Constructor ''' super(PowerVCCinderManager, self).__init__() self._load_power_config(sys.argv) self._service = pvcservice.PowerVCService() ctx = self._get_context() self._staging_cache = utils.StagingCache() if not utils.get_utils().validate_scgs(): LOG.error(_('Cinder-powervc service terminated, Invalid Storage' ' Connectivity Group specified.')) sys.exit(1) # Keep track of whether or not we need to sync all volume types on the # next volume type sync interval. self.full_volume_type_sync_required = False self.full_volume_sync_required = False self.sync_volume_types = {} self.sync_volumes = {} # Delete volums first! # It will try delete un-referred volume-types self._synchronize_volumes(ctx) self._synchronize_volume_types(ctx) # Uncomment line below to start cinder-volume along with cinder-powervc # self.start_volume_service() # Listen for out-of-band PowerVC changes self._create_powervc_listeners(ctx) # Set up periodic polling to sync instances self._start_periodic_volume_type_sync(ctx) self._start_periodic_volume_sync(ctx) def _get_context(self): # lazy import factory to avoid connect to env when load manager from powervc.common.client import factory keystone = factory.LOCAL.keystone orig_ctx = context.get_admin_context() orig_ctx.project_id = keystone.tenant_id orig_ctx.user_id = keystone.user_id return ctx_delegate.context_dynamic_auth_token(orig_ctx, keystone) def _load_power_config(self, argv): """ Loads the powervc config. """ # Cinder is typically started with the --config-file option. # This prevents the default config files from loading since # the olso config code will only load those # config files as specified on the command line. # If the cinder is started with the # --config-file option then append our powervc.conf file to # the command line so it gets loaded as well. for arg in argv: if arg == '--config-file' or arg.startswith('--config-file='): argv[len(argv):] = ["--config-file"] + \ [cfg.find_config_files(project='powervc', prog='powervc')[0]] break config.parse_power_config(argv, 'cinder') CONF.log_opt_values(LOG, logging.INFO) def start_volume_service(self): """ Creates and starts a cinder-volume service """ if CONF.enabled_backends: for backend in CONF.enabled_backends: host = "%s@%s" % (CONF.host, backend) self.volume_service = \ taskservice.Service.create(host=host, service_name=backend) else: self.volume_service = \ taskservice.Service.create(binary='cinder-volume') self.volume_service.start() def _create_powervc_listeners(self, ctx): """Listen for out-of-band changes made in PowerVC. This method creates the listner to the PowerVC AMQP broker and sets up handlers so that any changes made directly in PowerVC are reflected in the local OS. :param: ctx The security context """ LOG.debug("Enter _create_powervc_listeners method") endpoint = messaging.NotificationEndpoint(log=LOG, sec_context=ctx) # Volume type creation LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_TYPE_CREATE) endpoint.register_handler(constants.EVENT_VOLUME_TYPE_CREATE, self._handle_powervc_volume_type_create) # Volume type deletion LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_TYPE_DELETE) endpoint.register_handler(constants.EVENT_VOLUME_TYPE_DELETE, self._handle_powervc_volume_type_delete) # Volume type extra spec changes LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_TYPE_EXTRA_SPECS_UPDATE) endpoint.register_handler([ constants.EVENT_VOLUME_TYPE_EXTRA_SPECS_UPDATE], self._handle_powervc_volume_type_extra_spec_update) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_CREATE_END) endpoint.register_handler([constants.EVENT_VOLUME_CREATE_END], self._handle_powervc_volume_create) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_IMPORT_END) endpoint.register_handler([constants.EVENT_VOLUME_IMPORT_END], self._handle_powervc_volume_create) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_DELETE_END) endpoint.register_handler([constants.EVENT_VOLUME_DELETE_END], self._handle_powervc_volume_delete) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_UPDATE) endpoint.register_handler([constants.EVENT_VOLUME_UPDATE], self._handle_powervc_volume_update) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_ATTACH_END) endpoint.register_handler([constants.EVENT_VOLUME_ATTACH_END], self._handle_powervc_volume_update) LOG.debug(_("Register event handler for %s event ") % constants.EVENT_VOLUME_DETACH_END) endpoint.register_handler([constants.EVENT_VOLUME_DETACH_END], self._handle_powervc_volume_update) endpoints = [ endpoint, ] LOG.debug("Starting to listen...... ") messaging.start_listener(config.AMQP_POWERVC_CONF, constants.AMQP_EXCHANGE, constants.AMQP_TOPIC, endpoints) LOG.debug("Exit _create_powervc_listeners method") def _periodic_volume_type_sync(self, context, vol_type_ids=None): """ Called to synchronize volume type after initial boot. This does almost the same thing as the synchronize that happens on boot except this function will check that the instance states meet certain requirements before adding, removing, or updating them locally. :param: context The security context :param: instance_ids List of PowerVC volume type IDs to sync """ LOG.info(_("Starting volume type synchronization...")) # Some counters to record instances modified count_new_vol_types = 0 count_updated_vol_types = 0 count_deleted_vol_types = 0 # If a list of volume type IDs is passed in then this is a targeted # sync operation and not a full sync. is_full_sync = not vol_type_ids # If this is a full sync then reset the marked instances map, otherwise # just remove instances we are about to update. Do this up front so # that we minimize the likelihood of losing any instances that might # get marked during the sync operation. if is_full_sync: self.sync_vol_types = {} else: for vol_type_id in vol_type_ids: del self.sync_vol_types[vol_type_id] # Get both lists from local DB and PowerVC pvc_vol_types = {} local_vol_types = {} if is_full_sync: pvc_vol_types = self._service.list_volume_types() local_vol_types = self._get_all_local_pvc_volume_types(context) else: pvc_vol_types = [self._service.get_volume_type(x) for x in vol_type_ids] # Sync. from PowerVC to local nova DB, to insert new instances and # update existing instances. for index, pvc_vol_type in enumerate(pvc_vol_types): """ """ # If we are syncing a set of given PowerVC volume type IDs then we # first check if the PowerVC volume type exists. If it doesn't then # we attempt to delete the local corresponding volume type and move # on. if not is_full_sync and pvc_vol_type is None: matched_vol_types = self._get_local_volume_type_by_pvc_id( context, vol_type_ids[index]) for local_vol_type in matched_vol_types: if self._unregister_volume_types( context, local_vol_type.get('id')): count_deleted_vol_types += 1 continue # Convert PowerVC instance object to dictionary pvc_volume_type = pvc_vol_type.__dict__ matched_vol_types = self._get_local_volume_type_by_pvc_id( context, pvc_volume_type.get('id')) # If not found locally then try to add the new instance if len(matched_vol_types) == 0: if self._insert_pvc_volume_type(context, pvc_volume_type): count_new_vol_types += 1 continue if len(matched_vol_types) > 1: LOG.warning('More than one local volume type matches one ' 'PowerVC volume type: %s' % (pvc_volume_type.get('id'))) local_vol_type = matched_vol_types[0] # Local instance exists so try to update it if self._sync_existing_volume_type( context, local_vol_type, pvc_volume_type): count_updated_vol_types += 1 # Sync. from local nova DB to PowerVC, to remove invalid instances # that are not in PowerVC anymore. This only happens during a full # sync of all instances. for local_vol_type in local_vol_types: if not self._is_valid_pvc_volume_type(context, local_vol_types[ local_vol_type], pvc_vol_types): if self._unregister_volume_types( context, local_vol_types[local_vol_type].get('id')): count_deleted_vol_types += 1 LOG.info(""" ******************************* Volume type sync. is complete. Full sync: %(full_sync)s [ %(insert)s inserted, %(update)s updated, %(delete)s deleted ] ******************************* """ % {'full_sync': is_full_sync, 'insert': count_new_vol_types, 'update': count_updated_vol_types, 'delete': count_deleted_vol_types}) def _start_periodic_volume_type_sync(self, context): """ Initialize the periodic syncing of instances from PowerVC into the local OS. The powervc_instance_sync_interval config property determines how often the sync will occur, and the powervc_full_instance_sync_frequency config property determines the number of marked instance sync operations between full instance syncs. :param: context The security context """ # Enforce some minimum values for the sync interval properties # TODO: Minimum values should at least be documented conf_sync = CONF.powervc.volume_type_sync_interval conf_full_sync = CONF.powervc.full_volume_type_sync_frequency sync_interval = conf_sync if conf_sync > 10 else 10 full_sync_frequency = conf_full_sync if conf_full_sync > 2 else 2 self._volume_type_sync_counter = 0 def sync(): """Called on the volume type sync intervals""" self._volume_type_sync_counter += 1 try: # Check if it's time to do a full sync if self.full_volume_type_sync_required or \ self._volume_type_sync_counter == full_sync_frequency: self.full_volume_type_sync_required = False self._volume_type_sync_counter = 0 LOG.debug('Syncing all volume type on interval') self._periodic_volume_type_sync(context) return # If there are no marked instances to sync stop here vol_type_ids = self.sync_volume_types.keys() if len(vol_type_ids) == 0: return LOG.debug('Syncing marked volume types') self._periodic_volume_type_sync(context, type_ids=vol_type_ids) except Exception as e: LOG.exception(_("Error occurred during volume type " "synchronization: %s"), str(e)) LOG.info(_("Volume type synchronization will occur at the " "next scheduled interval.")) self.tg.add_timer(sync_interval, sync) def _handle_powervc_volume_type_create(self, context=None, ctxt=None, event_type=None, payload=None): """Handle instance create messages sent from PowerVC. :param: context The security context :param: ctxt message context :param: event_type message event type :param: payload The AMQP message sent from OpenStack (dictionary) """ vol_type = payload.get('volume_types') if(vol_type is None): LOG.warning("Null volume type in volume.create notification") return pvc_vol_type_id = vol_type.get('id') if(pvc_vol_type_id is None): LOG.warning("Null volume type id in volume.create notification") return # Check for matching local instance matched_vol_types = self.\ _get_local_volume_type_by_pvc_id(context, pvc_vol_type_id) # If the instance already exists locally then ignore if len(matched_vol_types) > 0: LOG.debug('Volume type already exists locally') return # Filter out the vol-type in scg that is not specified in powervc.conf extra_specs = getattr(vol_type, 'extra_specs', {}) # condition 1: volume-type has no extra_specs, add if not extra_specs: LOG.info(_("No extra_specs in storage template, just add")) self._insert_pvc_volume_type(context, vol_type) else: volume_backend_name = (extra_specs. get('capabilities:volume_backend_name', '')) # condition 2: extra_specs has no volume_backend_name, return if not volume_backend_name: LOG.info(_('No volume_backend_name specified' + ' return')) return accessible_storage_providers = utils.get_utils().\ get_multi_scg_accessible_storage_providers(None, None) if not accessible_storage_providers: LOG.info(_("No accessible_storage_providers, return")) return # condition 3: extra_specs's volume_backend_name == # accessible_storage_provider's storage_hostname, add for storage_provider in accessible_storage_providers: storage_hostname = getattr(storage_provider, 'storage_hostname', '') if volume_backend_name == storage_hostname: self._insert_pvc_volume_type(context, vol_type) def _handle_powervc_volume_type_delete(self, context=None, ctxt=None, event_type=None, payload=None): """Handle instance delete messages sent from PowerVC. :param: context The security context :param: ctxt message context :param: event_type message event type :param: payload The AMQP message sent from OpenStack (dictionary) """ vol_type = payload.get('volume_types') if(vol_type is None): LOG.warning("Null volume type, ignore volume.create notification") return pvc_vol_type_id = vol_type.get('id') if(pvc_vol_type_id is None): LOG.warning("Null volume type id, ignore volume.create") return # Check for matching local instance matched_vol_types = self.\ _get_local_volume_type_by_pvc_id(context, pvc_vol_type_id) # If the instance does not exist then ignore if len(matched_vol_types) == 0: LOG.debug('Volume type does not exist locally') return # Remove the instance from the local OS self._unregister_volume_types(context, pvc_vol_type_id) def _handle_powervc_volume_type_extra_spec_update(self, context=None, ctxt=None, event_type=None, payload=None): """Handle instance state changes sent from PowerVC. This includes instance update and all other state changes caused by events like power on, power off, resize, live migration, and snapshot. :param: context The security context :param: ctxt message context :param: event_type message event type :param: payload The AMQP message sent from OpenStack (dictionary) """ pvc_vol_type_id = payload.get('type_id') if(pvc_vol_type_id is None): LOG.debug('Null volume type id, ignore extra specs update') return # Get the matching local instance matched_vol_types = self.\ _get_local_volume_type_by_pvc_id(context, pvc_vol_type_id) # If the instance does not exist locally then ignore if len(matched_vol_types) == 0: LOG.debug('Volume type does not exist locally') # defer the insert to periodical check return # Warn if more than one local instance matches the PowerVC instance if len(matched_vol_types) > 1: LOG.warning('More than one volume types in DB ' 'match one PowerVC instance: %s' % (pvc_vol_type_id)) # TODO: We should do something about this but scheduling a sync # won't help since that does nothing to remove duplicate local # instances. # Get the PowerVC instance so we can compare it to the local instance local_vol_type = matched_vol_types[0] pvc_volume_type = self._service.get_volume_type(pvc_vol_type_id) # Warn if PowerVC instance is not found if pvc_volume_type is None: LOG.warning('PowerVC volume type could not be found: %s' % (pvc_vol_type_id)) return self._sync_existing_volume_type(context, local_vol_type, pvc_volume_type.__dict__) def _synchronize_volumes(self, context): """ Synchronize volumes """ local_volumes = self._get_all_local_volumes(context) pvc_volumes = self._service.get_volumes() self._synchronize_volumes_ex(context, local_volumes, pvc_volumes) def _delete_unused_volume_types(self, context, local_volume_types, pvc_volume_types): """ Delete volume-types that not in current powervc """ if local_volume_types is None: local_volume_types = self._get_all_local_pvc_volume_types(context) if pvc_volume_types is None: pvc_volume_types = self._service.list_volume_types() count_deleted_volume_types = 0 for local_volume_type in local_volume_types: if not self._is_valid_pvc_volume_type(context, local_volume_types[ local_volume_type], pvc_volume_types): # If it is not valid in pvc, also delete form the local. self._unregister_volume_types(context, local_volume_types[ local_volume_type]. get('id')) count_deleted_volume_types += 1 return count_deleted_volume_types def _synchronize_volume_types(self, context): """ Synchronize volume types """ # Some counters to record instances modified. count_new_volume_types = 0 count_updated_volume_types = 0 local_volume_types = self._get_all_local_pvc_volume_types(context) pvc_volume_types = self._service.list_volume_types() # Sync. from local nova DB ---> PowerVC, # to remove invalid instances that are not in pvc anymore. count_deleted_volume_types = ( self._delete_unused_volume_types(context, local_volume_types, pvc_volume_types)) # Sync. from PowerVC ---> local nova DB, # to insert new instances and update existing instances for volume_type in pvc_volume_types: # Convert an object to dictionary, # because some filed names has spaces. pvc_volume_type = volume_type.__dict__ matched_volume_types = self.\ _get_matched_volume_type_by_pvc_id( local_volume_types, pvc_volume_type.get('id')) if len(matched_volume_types) == 0: # Not found self._insert_pvc_volume_type(context, pvc_volume_type) count_new_volume_types += 1 else: # Found if len(matched_volume_types) > 1: LOG.warning("More than one volume type in DB match " "one PowerVC volume type: " + pvc_volume_type.get('id')) self._sync_existing_volume_type(context, matched_volume_types[0], pvc_volume_type) count_updated_volume_types += 1 LOG.info(""" ******************************* Initial volume type sync. is complete. [ %(insert)s inserted, %(update)s updated, %(delete)s deleted ] ******************************* """ % {'insert': count_new_volume_types, 'update': count_updated_volume_types, 'delete': count_deleted_volume_types}) def _get_all_local_pvc_volume_types(self, context): """ Get all local volume types that are mapped from PowerVC""" all_types = db.volume_type_get_all(context) filters = [] # Filter out non-powervc volume types for each in all_types: name = all_types[each]['name'] if(not name.startswith(constants.LOCAL_PVC_VOLUME_TYPE_PREFIX)): filters.append(name) for name in filters: del all_types[name] return all_types def _get_local_volume_type_by_pvc_id(self, context, pvcid): """ Get a local instance by a PowerVC uuid.""" local_volume_types = self._get_all_local_pvc_volume_types(context) return self._get_matched_volume_type_by_pvc_id( local_volume_types, pvcid) def _get_matched_volume_type_by_pvc_id(self, local_volume_types, pvcid): """ Get a local instance by a PowerVC uuid.""" matches = [] for item in local_volume_types: volume_type_id = local_volume_types[item].get('id') if(volume_type_id == pvcid): matches.append(local_volume_types[item]) return matches def _mask_pvc_volume_type_name(self, pvc_volume_type_name, storage_backend): if pvc_volume_type_name is None: return pvc_volume_type_name if storage_backend is None: storage_backend = '' return constants.LOCAL_PVC_VOLUME_TYPE_PREFIX + \ storage_backend + ':' + pvc_volume_type_name def _insert_pvc_volume_type(self, context, pvc_volume_type): storage_backend = '' extra_specs = pvc_volume_type.get('extra_specs') if(extra_specs is None): extra_specs = {} elif ('capabilities:volume_backend_name' in extra_specs): storage_backend = \ extra_specs['capabilities:volume_backend_name'] # Overwrite the volume backend name extra_specs['capabilities:volume_backend_name'] = \ constants.POWERVC_VOLUME_BACKEND volume_type = { 'id': pvc_volume_type.get('id'), 'name': self._mask_pvc_volume_type_name( pvc_volume_type.get('name'), storage_backend), 'extra_specs': extra_specs } ret = None try: ret = db.volume_type_create(context, volume_type) except Exception as e: ret = None LOG.debug(_("Failed to create volume type %s , Exception: %s") % (volume_type['name'], e)) return ret def _sync_existing_volume_type(self, context, local_volume_type, pvc_volume_type): if local_volume_type is None or pvc_volume_type is None: return False extra_specs = pvc_volume_type.get('extra_specs') if(extra_specs is None): extra_specs = {} # overwrite the volume backend name extra_specs['capabilities:volume_backend_name'] = \ constants.POWERVC_VOLUME_BACKEND try: db.volume_type_extra_specs_update_or_create( context, local_volume_type.get('id'), extra_specs) except Exception as e: LOG.debug(_("Failed to update volume type %s , Exception: %s") % (local_volume_type.get('id'), e)) return False return True def _unregister_volume_types(self, ctx, vol_type_id): """ Unregister the volume type from the local database. This does not use the Cinder API which would send an RPC to have the instance deleted. The instance has already been removed from PowerVC so we just send our own notifications locally and remove it from the database. """ # If the instance does not exist then ignore if vol_type_id is None: LOG.debug('Volume type does not exist locally') return False try: db.volume_type_destroy(ctx, vol_type_id) except Exception as e: LOG.debug(_("Failed to delete volume type %s , Exception: %s") % (vol_type_id, e)) return False return True def _is_valid_pvc_volume_type(self, context, local_volume_type, pvc_volume_types): found = False for volume_type in pvc_volume_types: pvc_vol_type = volume_type.__dict__ if (local_volume_type.get('id') == pvc_vol_type.get('id')): found = True break return found def _handle_powervc_volume_create(self, context=None, ctxt=None, event_type=None, payload=None): """Handle volume create messages sent from PowerVC. :param: context The security context :param: ctxt message context :param: event_type message event type :param: payload The AMQP message sent from OpenStack (dictionary) """ pvc_volume_id = payload.get('volume_id') # wait 15sec to avoid time window that will create duplicated volume time.sleep(15) # If the volume already exists locally then ignore local_volume = self._get_local_volume_by_pvc_id(context, pvc_volume_id) if local_volume is not None: LOG.debug('Volume already exists locally') return volume = self._service.get_volume_by_id(pvc_volume_id) if volume is not None: volume_id = volume.__dict__.get("id") scg_accessible_volumes = self._service.get_volumes() for accessible_volume in scg_accessible_volumes: accessible_volume_id = accessible_volume.__dict__.get("id") if(accessible_volume_id == volume_id): self._insert_pvc_volume(context, volume.__dict__) return LOG.debug('Volume not accessible, ignored!') return def _handle_powervc_volume_delete(self, context=None, ctxt=None, event_type=None, payload=None): """Handle volume create messages sent from PowerVC. :param: context The security context :param: ctxt message context :param: event_type message event type :param: payload The AMQP message sent from OpenStack (dictionary) """ # wait 15sec to avoid time window that will duplicated delete volume time.sleep(15) pvc_volume_id = payload.get('volume_id') # If the volume does not already exist locally then ignore local_volume = self._get_local_volume_by_pvc_id(context, pvc_volume_id) if local_volume is None: LOG.debug('Volume is non-existent locally, ignore delete handle') return self._unregister_volumes(context, local_volume) def _handle_powervc_volume_update(self, context=None, ctxt=None, event_type=None, payload=None): """Handle volume create messages sent from PowerVC. :param: context The security context :param: ctxt message context :param: event_type message event type :param: payload The AMQP message sent from OpenStack (dictionary) """ pvc_volume_id = payload.get('volume_id') local_volume = self._get_local_volume_by_pvc_id(context, pvc_volume_id) if local_volume is None: LOG.debug('Volume is non-existent locally, ignore update handle') return pvc_volume = self._service.get_volume_by_id(pvc_volume_id) if pvc_volume is not None: self._sync_existing_volume(context, local_volume, pvc_volume.__dict__) else: LOG.debug('Tried to add newly created volume but it could not ' 'be found in PowerVC') def _get_all_local_volumes(self, context): local_pvc_volumes = [] try: db_matches = db.volume_get_all(context, marker=None, limit=None, sort_keys=['created_at'], sort_dirs=['desc']) for local_volume in db_matches: if self._get_pvc_id_from_local_volume(local_volume)is not None: local_pvc_volumes.append(local_volume) except Exception as e: local_pvc_volumes = None LOG.debug(_('Failed to get all local volumes, \ Exception: %s') % (e)) return local_pvc_volumes def _get_local_volume_by_pvc_id(self, context, pvc_id, is_map=True): """ Get a local volume by volume id.""" ret_volume = None if pvc_id is None: return ret_volume if is_map is False: try: ret_volume = db.volume_get(context, pvc_id) except Exception: ret_volume = None LOG.debug(_('Volume %s could not be found.') % pvc_id) else: all_local_volumes = None try: all_local_volumes = self._get_all_local_volumes(context) except: all_local_volumes = None if all_local_volumes is not None: for volume in all_local_volumes: temp = self._get_pvc_id_from_local_volume(volume) if temp == pvc_id: ret_volume = volume break return ret_volume def _get_local_volume_type_by_id(self, context, volume_type_id, inactive=False): """ Get a local volume type by volume id.""" ret_volume_type = None try: ret_volume_type = db.api.volume_type_get(context=context, id=volume_type_id) except Exception as e: ret_volume_type = None LOG.debug(_("Failed to get local volume type by id [%s]. \ Exception: %s") % (volume_type_id, e)) return ret_volume_type def _get_local_volume_type_by_name(self, context, volume_type_name, inactive=False): ret_volume_type = None if volume_type_name is None: return ret_volume_type try: ret_volume_type = db.api.volume_type_get_by_name(context, volume_type_name) except Exception as e: ret_volume_type = None LOG.debug(_("Failed to get local volume type by name [%s]. \ Exception: %s") % (volume_type_name, e)) return ret_volume_type def _exist_local_volume_type(self, context, volume_type_id, searchInactive): """ Check if exist volume type by volume type id .""" if volume_type_id is None: return False volume_type = self._get_local_volume_type_by_id(context, volume_type_id, False) if volume_type is not None: return True if searchInactive is True: volume_type = self._get_local_volume_type_by_id(context, volume_type_id, True) if volume_type is not None: return True return False def _get_matched_volume_by_pvc_id(self, local_volumes, pvcid): """ Get a local instance by a PowerVC uuid.""" matches = [] if local_volumes is None or pvcid is None: return matches for item in local_volumes: volume_id = self._get_pvc_id_from_local_volume(item) if(volume_id == pvcid): matches.append(item) return matches def _is_valid_pvc_volume(self, context, local_volume, pvc_volumes): found = False for volume in pvc_volumes: pvc_volume = volume.__dict__ local_volume_id = self._get_pvc_id_from_local_volume(local_volume) if (local_volume_id == pvc_volume.get('id')): found = True break return found def _is_intermediate_state(self, local_volume=None, pvc_volume=None): intermediate_statuses = [constants.STATUS_ATTACHING] if local_volume: local_volume_state = local_volume.get('status') if local_volume_state in intermediate_statuses: return True if pvc_volume: pvc_volume_state = pvc_volume.get('status') if pvc_volume_state in intermediate_statuses: return True return False def _sync_existing_volume(self, context, local_volume, pvc_volume): ret = False if local_volume is None or pvc_volume is None: LOG.debug('Local volume or PVC volume is none and ignore it') return ret if not self._staging_cache.is_valid: LOG.warning(_("Staging user or project invalid." " Skipping volume sync.")) return ret if self._is_intermediate_state(local_volume, pvc_volume): LOG.info('volume can not be synced as local or pvc volume state in' 'intermediate state: %s, ' '%s' % (local_volume.get('status'), pvc_volume)) return values = self._get_values_from_volume(context, pvc_volume, local_volume) try: db.volume_update(context, local_volume.get('id'), values) self._update_volume_attachments(context, pvc_volume, local_volume) ret = True except Exception as e: ret = False LOG.debug(_("Failed to update volume [%s] existed. Exception: %s") % (local_volume.get('display_name'), e)) return ret def _unregister_volumes(self, context, local_volume): """ Unregister the volume from the local database. This does not use the Cinder API which would send an RPC to have the instance deleted. The instance has already been removed from PowerVC so we just send our own notifications locally and remove it from the database. """ ret = False volume_id = local_volume.get('id') volume_name = local_volume.get('display_name') volume_size = local_volume.get('size') if volume_id is None: LOG.debug('Volume id is none and ignore it') return ret try: # check first if the volume to be deleted existed. volume_to_be_deleted = db.volume_get(context, volume_id) if volume_to_be_deleted: db.volume_destroy(context, volume_id) # update the quotas reserve_opts = {'volumes': -1, 'gigabytes': -volume_size} reservations = QUOTAS.reserve(context, **reserve_opts) LOG.info(_("Start to deduct quota of volume: %s, size: %s") % (volume_name, volume_size)) QUOTAS.commit(context, reservations) ret = True except Exception as e: ret = False LOG.debug(_("Failed to delete local volume %s, Exception: %s") % (volume_id, e)) return ret def _insert_pvc_volume(self, context, volume): """ Create one volume""" if volume is None: LOG.debug("Volume is None, cannot insert it") return if self._is_intermediate_state(pvc_volume=volume): LOG.info('pvc volume is in a intermediate status, ignore to insert' ': %s' % volume) return volume_info = volume volume_type = volume_info.get('volume_type') volume_display_name = volume_info.get('display_name') if volume_type is None or volume_type == 'None': LOG.debug(_("Volume type is None for volume: %s") % volume_display_name) else: LOG.debug("Check if exist volume type in local hosting OS, \ only including active") pvc_volume_type = None try: pvc_volume_type = self._service \ .get_volume_type_by_name(volume_type) except Exception as e: LOG.debug(_("Failed to get volume type from " "PowerVC by name [%s]. Exception: %s") % (volume_type, e)) if pvc_volume_type is not None: dict_pvc_volume_type = pvc_volume_type.__dict__ exist_volume_type = self.\ _exist_local_volume_type(context, dict_pvc_volume_type.get("id"), False) if exist_volume_type is False: LOG.debug(_('''Volume type [%s] is non-existent, insert into hosting OS''') % volume_type) try: self._insert_pvc_volume_type(context, dict_pvc_volume_type) except Exception: LOG.debug("Failed to insert volume type") LOG.debug("Insert volume type successfully") else: LOG.debug(_("Volume type [%s] existed") % volume_type) values = self._get_values_from_volume(context, volume) if values is None: LOG.warning(_("Staging user or project invalid." " Skipping volume sync.")) return None else: try: local_volume = db.volume_create(context, values) # update the instances that attach this volume self._update_volume_attachments(context, volume, local_volume) # update the quotas volume_name = local_volume.get('name') volume_size = local_volume.get('size') reserve_opts = {'volumes': 1, 'gigabytes': volume_size} LOG.info(_("Start to reserve quota of volume: %s, size: %s") % (volume_name, volume_size)) reservations = QUOTAS.reserve(context, **reserve_opts) QUOTAS.commit(context, reservations) except Exception as e: LOG.debug(_("Failed to create volume %s. Exception: %s") % (str(values), str(e))) return None LOG.debug(_("Create volume %s successfully") % values) def _get_values_from_volume(self, context, volume, local_volume=None): if volume is None: return None project_id = None user_id = None if local_volume is None: user_id, project_id = \ self._staging_cache.get_staging_user_and_project() if user_id is None: LOG.warning(_("Staging user or project invalid.")) return None else: project_id = local_volume.get('project_id') user_id = local_volume.get('user_id') metadata = volume.get('metadata') if metadata is None: metadata = {} metadata[constants.LOCAL_PVC_PREFIX + 'os-vol-tenant-attr:tenant_id']\ = volume.get('os-vol-tenant-attr:tenant_id') health_value = None health_status = volume.get('health_status') if health_status is not None: health_value = health_status.get('health_value') metadata[constants.LOCAL_PVC_PREFIX + 'health_status.health_value']\ = health_value metadata[constants.LOCAL_PVC_PREFIX + 'os-vol-host-attr:host'] \ = volume.get('os-vol-host-attr:host') metadata[constants.LOCAL_PVC_PREFIX + 'id'] \ = volume.get('id') # Get volume type id volume_type_id = None volume_type_name = volume.get('volume_type') if(volume_type_name is not None and volume_type_name != 'None'): storage_backend = volume.get('os-vol-host-attr:host') local_volume_type_name = self._mask_pvc_volume_type_name( volume_type_name, storage_backend) if local_volume_type_name is not None: volume_type = self.\ _get_local_volume_type_by_name(context, local_volume_type_name) if volume_type is not None: volume_type_id = volume_type.get('id') bootable = 0 if volume.get('bootable') == 'true': bootable = 1 host = CONF.host if CONF.enabled_backends is not None and\ constants.BACKEND_POWERVCDRIVER in CONF.enabled_backends: host = "%s@%s" % (CONF.host, constants.BACKEND_POWERVCDRIVER) disp_name = volume.get('display_name') or volume.get('name') volume_desc = volume.get('display_description') or \ volume.get('description') LOG.debug(_("volume disp_name: %s") % disp_name) multiattach = volume.get('multiattach', False) or False values = {'display_name': disp_name, 'display_description': volume_desc, # 'volume_type_id': volume_type_id, # 'id': volume['id'], 'status': volume.get('status'), 'host': host, 'size': volume.get('size'), 'availability_zone': volume.get('availability_zone'), 'bootable': bootable, 'snapshot_id': volume.get('snapshot_id'), 'source_volid': volume.get('source_volid'), 'metadata': metadata, 'project_id': project_id, 'user_id': user_id, 'multiattach': multiattach } if(volume_type_id is not None): values['volume_type_id'] = volume_type_id return values def _update_volume_attachments(self, context, pvc_volume, local_volume): """ Add/Delete/Update volume attachments information which includes attaching instance, mountpoint, volume id, etc. Note that pvc driver passes pvc volume attachments' "attachment_id" to db api, which will be set as local attachment_id. """ # Get attachment information attachments = pvc_volume.get('attachments') pvc_att_list = [] for attachment in attachments: pvc_att_id = attachment.get('attachment_id') pvc_att_list.append(pvc_att_id) try: local_att = db.volume_attachment_get(context, pvc_att_id) if local_att: # if found local attachments by pvc attachments' # attachment_id, then update self._update_volume_attachment(context, attachment, local_att.get('id')) except: # if no local attachment found, then insert a new one, which # id maps to pvc attachments' attachment_id self._insert_volume_attachment(context, pvc_att_id, attachment, local_volume) # Cleanup the local attachments local_vid = local_volume.get('id') attachments = db.volume_attachment_get_used_by_volume_id(context, local_vid) attachments_tbd = [att for att in attachments if att.get('id') not in pvc_att_list] for attachment_tbd in attachments_tbd: db.volume_detached(context, local_vid, attachment_tbd.get('id')) def _insert_volume_attachment(self, context, pvc_att_id, pvc_attachment, local_volume): # set new local att id equal with pvc id to make the sync possible local_att_id = pvc_att_id pvc_instance_uuid = pvc_attachment.get('server_id') local_instance_uuid = self._get_local_instance_id(pvc_instance_uuid) mountpoint = pvc_attachment.get('device') hostname = pvc_attachment.get('host_name') values = {'id': local_att_id, 'volume_id': local_volume.get('id'), 'server_id': local_instance_uuid} attachment = db.volume_attach(context, values) db.volume_attached(context, attachment['id'], local_instance_uuid, hostname, mountpoint) def _update_volume_attachment(self, context, pvc_attachment, local_att_id): mountpoint = pvc_attachment.get('device') hostname = pvc_attachment.get('host_name') pvc_instance_uuid = pvc_attachment.get('server_id') local_instance_uuid = self._get_local_instance_id(pvc_instance_uuid) values = {'instance_uuid': local_instance_uuid, 'attached_host': hostname, 'mountpoint': mountpoint} db.volume_attachment_update(context, local_att_id, values) def _start_periodic_volume_sync(self, context): """ Initialize the periodic syncing of instances from PowerVC into the local OS. The powervc_instance_sync_interval config property determines how often the sync will occur, and the powervc_full_instance_sync_frequency config property determines the number of marked instance sync operations between full instance syncs. :param: context The security context """ # Enforce some minimum values for the sync interval properties # TODO: Minimum values should at least be documented conf_sync = CONF.powervc.volume_sync_interval conf_full_sync = CONF.powervc.full_volume_sync_frequency sync_interval = conf_sync if conf_sync > 10 else 10 full_sync_frequency = conf_full_sync if conf_full_sync > 2 else 2 self._volume_sync_counter = 0 def sync(): """Called on the volume sync intervals""" self._volume_sync_counter += 1 try: local_volumes = None is_full_sync = True # Check if it's time to do a full sync if self.full_volume_sync_required or \ self._volume_sync_counter == full_sync_frequency: self.full_volume_sync_required = False self._volume_sync_counter = 0 local_volumes = self._get_all_local_volumes(context) LOG.debug('Syncing all volume on interval') else: # If there are no marked volumes to sync stop here if len(self.sync_volumes) == 0: return is_full_sync = False local_volumes = self.sync_volumes LOG.debug('Syncing marked volumes') pvc_volumes = self._service.get_volumes() self._synchronize_volumes_ex(context, local_volumes, pvc_volumes, is_full_sync) except Exception as e: LOG.exception(_("Error occurred during volume " "sychronization: %s."), e) LOG.info(_("Volume synchronization will occur at the next " "scheduled interval.")) self.tg.add_timer(sync_interval, sync) def _synchronize_volumes_ex(self, context, local_volumes, pvc_volumes, is_full_sync=True): """ Synchronize volumes """ LOG.info(_("Volume synchronization started...")) if pvc_volumes is None: pvc_volumes = [] if local_volumes is None: local_volumes = [] count_created_volumes = 0 count_updated_volumes = 0 count_deleted_volumes = 0 # Local ---> Powervc # First Delete local unused volumes for local_volume in local_volumes: if not self._is_valid_pvc_volume(context, local_volume, pvc_volumes): # If it is not valid in pvc, also delete form the local. self._unregister_volumes(context, local_volume) count_deleted_volumes += 1 # Try delete unused volume-types # parameter None will force to get inf from local and powervc deleted_volume_types = ( self._delete_unused_volume_types(context, local_volume_types=None, pvc_volume_types=None)) LOG.info(' Delete %i unused volume-types when sync volumes' % deleted_volume_types) # Powervc ---> Local for volume in pvc_volumes: pvc_volume = volume.__dict__ matched_volumes = self._get_matched_volume_by_pvc_id( local_volumes, pvc_volume.get('id')) if len(matched_volumes) == 0: self._insert_pvc_volume(context, pvc_volume) count_created_volumes += 1 else: if len(matched_volumes) > 1: LOG.warning("More than one volume in DB match " "one PowerVC volume: " + pvc_volume.get('id')) # TODO: We should do something about this but scheduling # a sync won't help since that does nothing to remove # duplicate local volumes. self._sync_existing_volume(context, matched_volumes[0], pvc_volume) count_updated_volumes += 1 LOG.info(""" ******************************* Volume sync. is complete. Full sync: %(full_sync)s [ %(insert)s inserted, %(update)s updated, %(delete)s deleted ] ******************************* """ % {'full_sync': is_full_sync, 'insert': count_created_volumes, 'update': count_updated_volumes, 'delete': count_deleted_volumes}) def _get_local_instance_id(self, pvc_instance_id, is_map=True): ret_instance_id = pvc_instance_id if is_map is False: return ret_instance_id if pvc_instance_id is None: return ret_instance_id from powervc.common.constants import SERVICE_TYPES # lazy import factory to avoid connect to env when load manager from powervc.common.client import factory novaclient = factory.LOCAL.get_client(str(SERVICE_TYPES.compute)) local_instances = novaclient.manager.list_all_servers( search_opts={'all_tenants': 1}) for inst in local_instances: metadata = inst._info['metadata'] meta_pvc_id = None if 'pvc_id' in metadata: meta_pvc_id = metadata['pvc_id'] if meta_pvc_id == pvc_instance_id: ret_instance_id = inst._info['id'] break return ret_instance_id def _get_pvc_id_from_local_volume(self, local_volume, is_map=True): ret_pvc_id = None if local_volume is None: return ret_pvc_id if is_map is False: ret_pvc_id = local_volume.get('id') else: id_key = constants.LOCAL_PVC_PREFIX + 'id' if isinstance(local_volume, cinder.db.sqlalchemy.models.Volume): metadata = local_volume.get('volume_metadata') for item in metadata: if id_key == item['key']: ret_pvc_id = item['value'] break elif isinstance(local_volume, dict): metadata = local_volume.get('metadata') if metadata is not None and id_key in metadata: ret_pvc_id = metadata[id_key] return ret_pvc_id