Merge remote-tracking branch 'nova.old/master'
This commit is contained in:
commit
ce4aba643e
|
@ -0,0 +1,759 @@
|
|||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Instance Metadata information."""
|
||||
|
||||
import os
|
||||
import posixpath
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import base64
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
from nova.api.ec2 import ec2utils
|
||||
from nova.api.metadata import password
|
||||
from nova.api.metadata import vendordata
|
||||
from nova.api.metadata import vendordata_dynamic
|
||||
from nova.api.metadata import vendordata_json
|
||||
from nova import block_device
|
||||
from nova.cells import opts as cells_opts
|
||||
from nova.cells import rpcapi as cells_rpcapi
|
||||
import nova.conf
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova import network
|
||||
from nova.network.security_group import openstack_driver
|
||||
from nova import objects
|
||||
from nova.objects import virt_device_metadata as metadata_obj
|
||||
from nova import utils
|
||||
from nova.virt import netutils
|
||||
|
||||
|
||||
CONF = nova.conf.CONF
|
||||
|
||||
VERSIONS = [
|
||||
'1.0',
|
||||
'2007-01-19',
|
||||
'2007-03-01',
|
||||
'2007-08-29',
|
||||
'2007-10-10',
|
||||
'2007-12-15',
|
||||
'2008-02-01',
|
||||
'2008-09-01',
|
||||
'2009-04-04',
|
||||
]
|
||||
|
||||
# NOTE(mikal): think of these strings as version numbers. They traditionally
|
||||
# correlate with OpenStack release dates, with all the changes for a given
|
||||
# release bundled into a single version. Note that versions in the future are
|
||||
# hidden from the listing, but can still be requested explicitly, which is
|
||||
# required for testing purposes. We know this isn't great, but its inherited
|
||||
# from EC2, which this needs to be compatible with.
|
||||
FOLSOM = '2012-08-10'
|
||||
GRIZZLY = '2013-04-04'
|
||||
HAVANA = '2013-10-17'
|
||||
LIBERTY = '2015-10-15'
|
||||
NEWTON_ONE = '2016-06-30'
|
||||
NEWTON_TWO = '2016-10-06'
|
||||
OCATA = '2017-02-22'
|
||||
|
||||
OPENSTACK_VERSIONS = [
|
||||
FOLSOM,
|
||||
GRIZZLY,
|
||||
HAVANA,
|
||||
LIBERTY,
|
||||
NEWTON_ONE,
|
||||
NEWTON_TWO,
|
||||
OCATA,
|
||||
]
|
||||
|
||||
VERSION = "version"
|
||||
CONTENT = "content"
|
||||
CONTENT_DIR = "content"
|
||||
MD_JSON_NAME = "meta_data.json"
|
||||
VD_JSON_NAME = "vendor_data.json"
|
||||
VD2_JSON_NAME = "vendor_data2.json"
|
||||
NW_JSON_NAME = "network_data.json"
|
||||
UD_NAME = "user_data"
|
||||
PASS_NAME = "password"
|
||||
MIME_TYPE_TEXT_PLAIN = "text/plain"
|
||||
MIME_TYPE_APPLICATION_JSON = "application/json"
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InvalidMetadataVersion(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class InvalidMetadataPath(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class InstanceMetadata(object):
|
||||
"""Instance metadata."""
|
||||
|
||||
def __init__(self, instance, address=None, content=None, extra_md=None,
|
||||
network_info=None, vd_driver=None, network_metadata=None,
|
||||
request_context=None):
|
||||
"""Creation of this object should basically cover all time consuming
|
||||
collection. Methods after that should not cause time delays due to
|
||||
network operations or lengthy cpu operations.
|
||||
|
||||
The user should then get a single instance and make multiple method
|
||||
calls on it.
|
||||
"""
|
||||
if not content:
|
||||
content = []
|
||||
|
||||
ctxt = context.get_admin_context()
|
||||
|
||||
# NOTE(danms): Sanitize the instance to limit the amount of stuff
|
||||
# inside that may not pickle well (i.e. context). We also touch
|
||||
# some of the things we'll lazy load later to make sure we keep their
|
||||
# values in what we cache.
|
||||
instance.ec2_ids
|
||||
instance.keypairs
|
||||
instance.device_metadata
|
||||
instance = objects.Instance.obj_from_primitive(
|
||||
instance.obj_to_primitive())
|
||||
|
||||
# The default value of mimeType is set to MIME_TYPE_TEXT_PLAIN
|
||||
self.set_mimetype(MIME_TYPE_TEXT_PLAIN)
|
||||
self.instance = instance
|
||||
self.extra_md = extra_md
|
||||
|
||||
self.availability_zone = instance.get('availability_zone')
|
||||
|
||||
secgroup_api = openstack_driver.get_openstack_security_group_driver()
|
||||
self.security_groups = secgroup_api.get_instance_security_groups(
|
||||
ctxt, instance)
|
||||
|
||||
self.mappings = _format_instance_mapping(ctxt, instance)
|
||||
|
||||
if instance.user_data is not None:
|
||||
self.userdata_raw = base64.decode_as_bytes(instance.user_data)
|
||||
else:
|
||||
self.userdata_raw = None
|
||||
|
||||
self.address = address
|
||||
|
||||
# expose instance metadata.
|
||||
self.launch_metadata = utils.instance_meta(instance)
|
||||
|
||||
self.password = password.extract_password(instance)
|
||||
|
||||
self.uuid = instance.uuid
|
||||
|
||||
self.content = {}
|
||||
self.files = []
|
||||
|
||||
# get network info, and the rendered network template
|
||||
if network_info is None:
|
||||
network_info = instance.info_cache.network_info
|
||||
|
||||
# expose network metadata
|
||||
if network_metadata is None:
|
||||
self.network_metadata = netutils.get_network_metadata(network_info)
|
||||
else:
|
||||
self.network_metadata = network_metadata
|
||||
|
||||
self.ip_info = \
|
||||
ec2utils.get_ip_info_for_instance_from_nw_info(network_info)
|
||||
|
||||
self.network_config = None
|
||||
cfg = netutils.get_injected_network_template(network_info)
|
||||
|
||||
if cfg:
|
||||
key = "%04i" % len(self.content)
|
||||
self.content[key] = cfg
|
||||
self.network_config = {"name": "network_config",
|
||||
'content_path': "/%s/%s" % (CONTENT_DIR, key)}
|
||||
|
||||
# 'content' is passed in from the configdrive code in
|
||||
# nova/virt/libvirt/driver.py. That's how we get the injected files
|
||||
# (personalities) in. AFAIK they're not stored in the db at all,
|
||||
# so are not available later (web service metadata time).
|
||||
for (path, contents) in content:
|
||||
key = "%04i" % len(self.content)
|
||||
self.files.append({'path': path,
|
||||
'content_path': "/%s/%s" % (CONTENT_DIR, key)})
|
||||
self.content[key] = contents
|
||||
|
||||
if vd_driver is None:
|
||||
vdclass = importutils.import_class(CONF.vendordata_driver)
|
||||
else:
|
||||
vdclass = vd_driver
|
||||
|
||||
self.vddriver = vdclass(instance=instance, address=address,
|
||||
extra_md=extra_md, network_info=network_info)
|
||||
|
||||
self.route_configuration = None
|
||||
|
||||
# NOTE(mikal): the decision to not pass extra_md here like we
|
||||
# do to the StaticJSON driver is deliberate. extra_md will
|
||||
# contain the admin password for the instance, and we shouldn't
|
||||
# pass that to external services.
|
||||
self.vendordata_providers = {
|
||||
'StaticJSON': vendordata_json.JsonFileVendorData(
|
||||
instance=instance, address=address,
|
||||
extra_md=extra_md, network_info=network_info),
|
||||
'DynamicJSON': vendordata_dynamic.DynamicVendorData(
|
||||
instance=instance, address=address,
|
||||
network_info=network_info, context=request_context)
|
||||
}
|
||||
|
||||
def _route_configuration(self):
|
||||
if self.route_configuration:
|
||||
return self.route_configuration
|
||||
|
||||
path_handlers = {UD_NAME: self._user_data,
|
||||
PASS_NAME: self._password,
|
||||
VD_JSON_NAME: self._vendor_data,
|
||||
VD2_JSON_NAME: self._vendor_data2,
|
||||
MD_JSON_NAME: self._metadata_as_json,
|
||||
NW_JSON_NAME: self._network_data,
|
||||
VERSION: self._handle_version,
|
||||
CONTENT: self._handle_content}
|
||||
|
||||
self.route_configuration = RouteConfiguration(path_handlers)
|
||||
return self.route_configuration
|
||||
|
||||
def set_mimetype(self, mime_type):
|
||||
self.md_mimetype = mime_type
|
||||
|
||||
def get_mimetype(self):
|
||||
return self.md_mimetype
|
||||
|
||||
def get_ec2_metadata(self, version):
|
||||
if version == "latest":
|
||||
version = VERSIONS[-1]
|
||||
|
||||
if version not in VERSIONS:
|
||||
raise InvalidMetadataVersion(version)
|
||||
|
||||
hostname = self._get_hostname()
|
||||
|
||||
floating_ips = self.ip_info['floating_ips']
|
||||
floating_ip = floating_ips and floating_ips[0] or ''
|
||||
|
||||
fixed_ips = self.ip_info['fixed_ips']
|
||||
fixed_ip = fixed_ips and fixed_ips[0] or ''
|
||||
|
||||
fmt_sgroups = [x['name'] for x in self.security_groups]
|
||||
|
||||
meta_data = {
|
||||
'ami-id': self.instance.ec2_ids.ami_id,
|
||||
'ami-launch-index': self.instance.launch_index,
|
||||
'ami-manifest-path': 'FIXME',
|
||||
'instance-id': self.instance.ec2_ids.instance_id,
|
||||
'hostname': hostname,
|
||||
'local-ipv4': fixed_ip or self.address,
|
||||
'reservation-id': self.instance.reservation_id,
|
||||
'security-groups': fmt_sgroups}
|
||||
|
||||
# public keys are strangely rendered in ec2 metadata service
|
||||
# meta-data/public-keys/ returns '0=keyname' (with no trailing /)
|
||||
# and only if there is a public key given.
|
||||
# '0=keyname' means there is a normally rendered dict at
|
||||
# meta-data/public-keys/0
|
||||
#
|
||||
# meta-data/public-keys/ : '0=%s' % keyname
|
||||
# meta-data/public-keys/0/ : 'openssh-key'
|
||||
# meta-data/public-keys/0/openssh-key : '%s' % publickey
|
||||
if self.instance.key_name:
|
||||
meta_data['public-keys'] = {
|
||||
'0': {'_name': "0=" + self.instance.key_name,
|
||||
'openssh-key': self.instance.key_data}}
|
||||
|
||||
if self._check_version('2007-01-19', version):
|
||||
meta_data['local-hostname'] = hostname
|
||||
meta_data['public-hostname'] = hostname
|
||||
meta_data['public-ipv4'] = floating_ip
|
||||
|
||||
if False and self._check_version('2007-03-01', version):
|
||||
# TODO(vish): store product codes
|
||||
meta_data['product-codes'] = []
|
||||
|
||||
if self._check_version('2007-08-29', version):
|
||||
instance_type = self.instance.get_flavor()
|
||||
meta_data['instance-type'] = instance_type['name']
|
||||
|
||||
if False and self._check_version('2007-10-10', version):
|
||||
# TODO(vish): store ancestor ids
|
||||
meta_data['ancestor-ami-ids'] = []
|
||||
|
||||
if self._check_version('2007-12-15', version):
|
||||
meta_data['block-device-mapping'] = self.mappings
|
||||
if self.instance.ec2_ids.kernel_id:
|
||||
meta_data['kernel-id'] = self.instance.ec2_ids.kernel_id
|
||||
if self.instance.ec2_ids.ramdisk_id:
|
||||
meta_data['ramdisk-id'] = self.instance.ec2_ids.ramdisk_id
|
||||
|
||||
if self._check_version('2008-02-01', version):
|
||||
meta_data['placement'] = {'availability-zone':
|
||||
self.availability_zone}
|
||||
|
||||
if self._check_version('2008-09-01', version):
|
||||
meta_data['instance-action'] = 'none'
|
||||
|
||||
data = {'meta-data': meta_data}
|
||||
if self.userdata_raw is not None:
|
||||
data['user-data'] = self.userdata_raw
|
||||
|
||||
return data
|
||||
|
||||
def get_ec2_item(self, path_tokens):
|
||||
# get_ec2_metadata returns dict without top level version
|
||||
data = self.get_ec2_metadata(path_tokens[0])
|
||||
return find_path_in_tree(data, path_tokens[1:])
|
||||
|
||||
def get_openstack_item(self, path_tokens):
|
||||
if path_tokens[0] == CONTENT_DIR:
|
||||
return self._handle_content(path_tokens)
|
||||
return self._route_configuration().handle_path(path_tokens)
|
||||
|
||||
def _metadata_as_json(self, version, path):
|
||||
metadata = {'uuid': self.uuid}
|
||||
if self.launch_metadata:
|
||||
metadata['meta'] = self.launch_metadata
|
||||
if self.files:
|
||||
metadata['files'] = self.files
|
||||
if self.extra_md:
|
||||
metadata.update(self.extra_md)
|
||||
if self.network_config:
|
||||
metadata['network_config'] = self.network_config
|
||||
|
||||
if self.instance.key_name:
|
||||
if cells_opts.get_cell_type() == 'compute':
|
||||
cells_api = cells_rpcapi.CellsAPI()
|
||||
try:
|
||||
keypair = cells_api.get_keypair_at_top(
|
||||
context.get_admin_context(), self.instance.user_id,
|
||||
self.instance.key_name)
|
||||
except exception.KeypairNotFound:
|
||||
# NOTE(lpigueir): If keypair was deleted, treat
|
||||
# it like it never had any
|
||||
keypair = None
|
||||
else:
|
||||
keypairs = self.instance.keypairs
|
||||
# NOTE(mriedem): It's possible for the keypair to be deleted
|
||||
# before it was migrated to the instance_extra table, in which
|
||||
# case lazy-loading instance.keypairs will handle the 404 and
|
||||
# just set an empty KeyPairList object on the instance.
|
||||
keypair = keypairs[0] if keypairs else None
|
||||
|
||||
if keypair:
|
||||
metadata['public_keys'] = {
|
||||
keypair.name: keypair.public_key,
|
||||
}
|
||||
|
||||
metadata['keys'] = [
|
||||
{'name': keypair.name,
|
||||
'type': keypair.type,
|
||||
'data': keypair.public_key}
|
||||
]
|
||||
else:
|
||||
LOG.debug("Unable to find keypair for instance with "
|
||||
"key name '%s'.", self.instance.key_name,
|
||||
instance=self.instance)
|
||||
|
||||
metadata['hostname'] = self._get_hostname()
|
||||
metadata['name'] = self.instance.display_name
|
||||
metadata['launch_index'] = self.instance.launch_index
|
||||
metadata['availability_zone'] = self.availability_zone
|
||||
|
||||
if self._check_os_version(GRIZZLY, version):
|
||||
metadata['random_seed'] = base64.encode_as_text(os.urandom(512))
|
||||
|
||||
if self._check_os_version(LIBERTY, version):
|
||||
metadata['project_id'] = self.instance.project_id
|
||||
|
||||
if self._check_os_version(NEWTON_ONE, version):
|
||||
metadata['devices'] = self._get_device_metadata(version)
|
||||
|
||||
self.set_mimetype(MIME_TYPE_APPLICATION_JSON)
|
||||
return jsonutils.dump_as_bytes(metadata)
|
||||
|
||||
def _get_device_metadata(self, version):
|
||||
"""Build a device metadata dict based on the metadata objects. This is
|
||||
done here in the metadata API as opposed to in the objects themselves
|
||||
because the metadata dict is part of the guest API and thus must be
|
||||
controlled.
|
||||
"""
|
||||
device_metadata_list = []
|
||||
vif_vlans_supported = self._check_os_version(OCATA, version)
|
||||
if self.instance.device_metadata is not None:
|
||||
for device in self.instance.device_metadata.devices:
|
||||
device_metadata = {}
|
||||
bus = 'none'
|
||||
address = 'none'
|
||||
|
||||
if 'bus' in device:
|
||||
# TODO(artom/mriedem) It would be nice if we had something
|
||||
# more generic, like a type identifier or something, built
|
||||
# into these types of objects, like a get_meta_type()
|
||||
# abstract method on the base DeviceBus class.
|
||||
if isinstance(device.bus, metadata_obj.PCIDeviceBus):
|
||||
bus = 'pci'
|
||||
elif isinstance(device.bus, metadata_obj.USBDeviceBus):
|
||||
bus = 'usb'
|
||||
elif isinstance(device.bus, metadata_obj.SCSIDeviceBus):
|
||||
bus = 'scsi'
|
||||
elif isinstance(device.bus, metadata_obj.IDEDeviceBus):
|
||||
bus = 'ide'
|
||||
elif isinstance(device.bus, metadata_obj.XenDeviceBus):
|
||||
bus = 'xen'
|
||||
else:
|
||||
LOG.debug('Metadata for device with unknown bus %s '
|
||||
'has not been included in the '
|
||||
'output', device.bus.__class__.__name__)
|
||||
continue
|
||||
if 'address' in device.bus:
|
||||
address = device.bus.address
|
||||
|
||||
if isinstance(device, metadata_obj.NetworkInterfaceMetadata):
|
||||
vlan = None
|
||||
if vif_vlans_supported and 'vlan' in device:
|
||||
vlan = device.vlan
|
||||
|
||||
# Skip devices without tags on versions that
|
||||
# don't support vlans
|
||||
if not (vlan or 'tags' in device):
|
||||
continue
|
||||
|
||||
device_metadata['type'] = 'nic'
|
||||
device_metadata['mac'] = device.mac
|
||||
if vlan:
|
||||
device_metadata['vlan'] = vlan
|
||||
elif isinstance(device, metadata_obj.DiskMetadata):
|
||||
device_metadata['type'] = 'disk'
|
||||
# serial and path are optional parameters
|
||||
if 'serial' in device:
|
||||
device_metadata['serial'] = device.serial
|
||||
if 'path' in device:
|
||||
device_metadata['path'] = device.path
|
||||
else:
|
||||
LOG.debug('Metadata for device of unknown type %s has not '
|
||||
'been included in the '
|
||||
'output', device.__class__.__name__)
|
||||
continue
|
||||
|
||||
device_metadata['bus'] = bus
|
||||
device_metadata['address'] = address
|
||||
if 'tags' in device:
|
||||
device_metadata['tags'] = device.tags
|
||||
|
||||
device_metadata_list.append(device_metadata)
|
||||
return device_metadata_list
|
||||
|
||||
def _handle_content(self, path_tokens):
|
||||
if len(path_tokens) == 1:
|
||||
raise KeyError("no listing for %s" % "/".join(path_tokens))
|
||||
if len(path_tokens) != 2:
|
||||
raise KeyError("Too many tokens for /%s" % CONTENT_DIR)
|
||||
return self.content[path_tokens[1]]
|
||||
|
||||
def _handle_version(self, version, path):
|
||||
# request for /version, give a list of what is available
|
||||
ret = [MD_JSON_NAME]
|
||||
if self.userdata_raw is not None:
|
||||
ret.append(UD_NAME)
|
||||
if self._check_os_version(GRIZZLY, version):
|
||||
ret.append(PASS_NAME)
|
||||
if self._check_os_version(HAVANA, version):
|
||||
ret.append(VD_JSON_NAME)
|
||||
if self._check_os_version(LIBERTY, version):
|
||||
ret.append(NW_JSON_NAME)
|
||||
if self._check_os_version(NEWTON_TWO, version):
|
||||
ret.append(VD2_JSON_NAME)
|
||||
|
||||
return ret
|
||||
|
||||
def _user_data(self, version, path):
|
||||
if self.userdata_raw is None:
|
||||
raise KeyError(path)
|
||||
return self.userdata_raw
|
||||
|
||||
def _network_data(self, version, path):
|
||||
if self.network_metadata is None:
|
||||
return jsonutils.dump_as_bytes({})
|
||||
return jsonutils.dump_as_bytes(self.network_metadata)
|
||||
|
||||
def _password(self, version, path):
|
||||
if self._check_os_version(GRIZZLY, version):
|
||||
return password.handle_password
|
||||
raise KeyError(path)
|
||||
|
||||
def _vendor_data(self, version, path):
|
||||
if self._check_os_version(HAVANA, version):
|
||||
self.set_mimetype(MIME_TYPE_APPLICATION_JSON)
|
||||
|
||||
# NOTE(mikal): backwards compatibility... If the deployer has
|
||||
# specified providers, and one of those providers is StaticJSON,
|
||||
# then do that thing here. Otherwise, if the deployer has
|
||||
# specified an old style driver here, then use that. This second
|
||||
# bit can be removed once old style vendordata is fully deprecated
|
||||
# and removed.
|
||||
if (CONF.api.vendordata_providers and
|
||||
'StaticJSON' in CONF.api.vendordata_providers):
|
||||
return jsonutils.dump_as_bytes(
|
||||
self.vendordata_providers['StaticJSON'].get())
|
||||
else:
|
||||
# TODO(mikal): when we removed the old style vendordata
|
||||
# drivers, we need to remove self.vddriver as well.
|
||||
return jsonutils.dump_as_bytes(self.vddriver.get())
|
||||
|
||||
raise KeyError(path)
|
||||
|
||||
def _vendor_data2(self, version, path):
|
||||
if self._check_os_version(NEWTON_TWO, version):
|
||||
self.set_mimetype(MIME_TYPE_APPLICATION_JSON)
|
||||
|
||||
j = {}
|
||||
for provider in CONF.api.vendordata_providers:
|
||||
if provider == 'StaticJSON':
|
||||
j['static'] = self.vendordata_providers['StaticJSON'].get()
|
||||
else:
|
||||
values = self.vendordata_providers[provider].get()
|
||||
for key in list(values):
|
||||
if key in j:
|
||||
LOG.warning('Removing duplicate metadata key: %s',
|
||||
key, instance=self.instance)
|
||||
del values[key]
|
||||
j.update(values)
|
||||
|
||||
return jsonutils.dump_as_bytes(j)
|
||||
|
||||
raise KeyError(path)
|
||||
|
||||
def _check_version(self, required, requested, versions=VERSIONS):
|
||||
return versions.index(requested) >= versions.index(required)
|
||||
|
||||
def _check_os_version(self, required, requested):
|
||||
return self._check_version(required, requested, OPENSTACK_VERSIONS)
|
||||
|
||||
def _get_hostname(self):
|
||||
return "%s%s%s" % (self.instance.hostname,
|
||||
'.' if CONF.dhcp_domain else '',
|
||||
CONF.dhcp_domain)
|
||||
|
||||
def lookup(self, path):
|
||||
if path == "" or path[0] != "/":
|
||||
path = posixpath.normpath("/" + path)
|
||||
else:
|
||||
path = posixpath.normpath(path)
|
||||
|
||||
# Set default mimeType. It will be modified only if there is a change
|
||||
self.set_mimetype(MIME_TYPE_TEXT_PLAIN)
|
||||
|
||||
# fix up requests, prepending /ec2 to anything that does not match
|
||||
path_tokens = path.split('/')[1:]
|
||||
if path_tokens[0] not in ("ec2", "openstack"):
|
||||
if path_tokens[0] == "":
|
||||
# request for /
|
||||
path_tokens = ["ec2"]
|
||||
else:
|
||||
path_tokens = ["ec2"] + path_tokens
|
||||
path = "/" + "/".join(path_tokens)
|
||||
|
||||
# all values of 'path' input starts with '/' and have no trailing /
|
||||
|
||||
# specifically handle the top level request
|
||||
if len(path_tokens) == 1:
|
||||
if path_tokens[0] == "openstack":
|
||||
# NOTE(vish): don't show versions that are in the future
|
||||
today = timeutils.utcnow().strftime("%Y-%m-%d")
|
||||
versions = [v for v in OPENSTACK_VERSIONS if v <= today]
|
||||
if OPENSTACK_VERSIONS != versions:
|
||||
LOG.debug("future versions %s hidden in version list",
|
||||
[v for v in OPENSTACK_VERSIONS
|
||||
if v not in versions], instance=self.instance)
|
||||
versions += ["latest"]
|
||||
else:
|
||||
versions = VERSIONS + ["latest"]
|
||||
return versions
|
||||
|
||||
try:
|
||||
if path_tokens[0] == "openstack":
|
||||
data = self.get_openstack_item(path_tokens[1:])
|
||||
else:
|
||||
data = self.get_ec2_item(path_tokens[1:])
|
||||
except (InvalidMetadataVersion, KeyError):
|
||||
raise InvalidMetadataPath(path)
|
||||
|
||||
return data
|
||||
|
||||
def metadata_for_config_drive(self):
|
||||
"""Yields (path, value) tuples for metadata elements."""
|
||||
# EC2 style metadata
|
||||
for version in VERSIONS + ["latest"]:
|
||||
if version in CONF.api.config_drive_skip_versions.split(' '):
|
||||
continue
|
||||
|
||||
data = self.get_ec2_metadata(version)
|
||||
if 'user-data' in data:
|
||||
filepath = os.path.join('ec2', version, 'user-data')
|
||||
yield (filepath, data['user-data'])
|
||||
del data['user-data']
|
||||
|
||||
try:
|
||||
del data['public-keys']['0']['_name']
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
filepath = os.path.join('ec2', version, 'meta-data.json')
|
||||
yield (filepath, jsonutils.dump_as_bytes(data['meta-data']))
|
||||
|
||||
ALL_OPENSTACK_VERSIONS = OPENSTACK_VERSIONS + ["latest"]
|
||||
for version in ALL_OPENSTACK_VERSIONS:
|
||||
path = 'openstack/%s/%s' % (version, MD_JSON_NAME)
|
||||
yield (path, self.lookup(path))
|
||||
|
||||
path = 'openstack/%s/%s' % (version, UD_NAME)
|
||||
if self.userdata_raw is not None:
|
||||
yield (path, self.lookup(path))
|
||||
|
||||
if self._check_version(HAVANA, version, ALL_OPENSTACK_VERSIONS):
|
||||
path = 'openstack/%s/%s' % (version, VD_JSON_NAME)
|
||||
yield (path, self.lookup(path))
|
||||
|
||||
if self._check_version(LIBERTY, version, ALL_OPENSTACK_VERSIONS):
|
||||
path = 'openstack/%s/%s' % (version, NW_JSON_NAME)
|
||||
yield (path, self.lookup(path))
|
||||
|
||||
if self._check_version(NEWTON_TWO, version,
|
||||
ALL_OPENSTACK_VERSIONS):
|
||||
path = 'openstack/%s/%s' % (version, VD2_JSON_NAME)
|
||||
yield (path, self.lookup(path))
|
||||
|
||||
for (cid, content) in self.content.items():
|
||||
yield ('%s/%s/%s' % ("openstack", CONTENT_DIR, cid), content)
|
||||
|
||||
|
||||
class RouteConfiguration(object):
|
||||
"""Routes metadata paths to request handlers."""
|
||||
|
||||
def __init__(self, path_handler):
|
||||
self.path_handlers = path_handler
|
||||
|
||||
def _version(self, version):
|
||||
if version == "latest":
|
||||
version = OPENSTACK_VERSIONS[-1]
|
||||
|
||||
if version not in OPENSTACK_VERSIONS:
|
||||
raise InvalidMetadataVersion(version)
|
||||
|
||||
return version
|
||||
|
||||
def handle_path(self, path_tokens):
|
||||
version = self._version(path_tokens[0])
|
||||
if len(path_tokens) == 1:
|
||||
path = VERSION
|
||||
else:
|
||||
path = '/'.join(path_tokens[1:])
|
||||
|
||||
path_handler = self.path_handlers[path]
|
||||
|
||||
if path_handler is None:
|
||||
raise KeyError(path)
|
||||
|
||||
return path_handler(version, path)
|
||||
|
||||
|
||||
def get_metadata_by_address(address):
|
||||
ctxt = context.get_admin_context()
|
||||
fixed_ip = network.API().get_fixed_ip_by_address(ctxt, address)
|
||||
LOG.info('Fixed IP %(ip)s translates to instance UUID %(uuid)s',
|
||||
{'ip': address, 'uuid': fixed_ip['instance_uuid']})
|
||||
|
||||
return get_metadata_by_instance_id(fixed_ip['instance_uuid'],
|
||||
address,
|
||||
ctxt)
|
||||
|
||||
|
||||
def get_metadata_by_instance_id(instance_id, address, ctxt=None):
|
||||
ctxt = ctxt or context.get_admin_context()
|
||||
attrs = ['ec2_ids', 'flavor', 'info_cache',
|
||||
'metadata', 'system_metadata',
|
||||
'security_groups', 'keypairs',
|
||||
'device_metadata']
|
||||
try:
|
||||
im = objects.InstanceMapping.get_by_instance_uuid(ctxt, instance_id)
|
||||
except exception.InstanceMappingNotFound:
|
||||
LOG.warning('Instance mapping for %(uuid)s not found; '
|
||||
'cell setup is incomplete', {'uuid': instance_id})
|
||||
instance = objects.Instance.get_by_uuid(ctxt, instance_id,
|
||||
expected_attrs=attrs)
|
||||
return InstanceMetadata(instance, address)
|
||||
|
||||
with context.target_cell(ctxt, im.cell_mapping) as cctxt:
|
||||
instance = objects.Instance.get_by_uuid(cctxt, instance_id,
|
||||
expected_attrs=attrs)
|
||||
return InstanceMetadata(instance, address)
|
||||
|
||||
|
||||
def _format_instance_mapping(ctxt, instance):
|
||||
bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
|
||||
ctxt, instance.uuid)
|
||||
return block_device.instance_block_mapping(instance, bdms)
|
||||
|
||||
|
||||
def ec2_md_print(data):
|
||||
if isinstance(data, dict):
|
||||
output = ''
|
||||
for key in sorted(data.keys()):
|
||||
if key == '_name':
|
||||
continue
|
||||
if isinstance(data[key], dict):
|
||||
if '_name' in data[key]:
|
||||
output += str(data[key]['_name'])
|
||||
else:
|
||||
output += key + '/'
|
||||
else:
|
||||
output += key
|
||||
|
||||
output += '\n'
|
||||
return output[:-1]
|
||||
elif isinstance(data, list):
|
||||
return '\n'.join(data)
|
||||
elif isinstance(data, (bytes, six.text_type)):
|
||||
return data
|
||||
else:
|
||||
return str(data)
|
||||
|
||||
|
||||
def find_path_in_tree(data, path_tokens):
|
||||
# given a dict/list tree, and a path in that tree, return data found there.
|
||||
for i in range(0, len(path_tokens)):
|
||||
if isinstance(data, dict) or isinstance(data, list):
|
||||
if path_tokens[i] in data:
|
||||
data = data[path_tokens[i]]
|
||||
else:
|
||||
raise KeyError("/".join(path_tokens[0:i]))
|
||||
else:
|
||||
if i != len(path_tokens) - 1:
|
||||
raise KeyError("/".join(path_tokens[0:i]))
|
||||
data = data[path_tokens[i]]
|
||||
return data
|
||||
|
||||
|
||||
# NOTE(mikal): this alias is to stop old style vendordata plugins from breaking
|
||||
# post refactor. It should be removed when we finish deprecating those plugins.
|
||||
VendorDataDriver = vendordata.VendorDataDriver
|
|
@ -0,0 +1,185 @@
|
|||
# Copyright (c) 2012 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
'''
|
||||
Websocket proxy that is compatible with OpenStack Nova.
|
||||
Leverages websockify.py by Joel Martin
|
||||
'''
|
||||
|
||||
import socket
|
||||
import sys
|
||||
|
||||
from oslo_log import log as logging
|
||||
from six.moves import http_cookies as Cookie
|
||||
import six.moves.urllib.parse as urlparse
|
||||
import websockify
|
||||
|
||||
import nova.conf
|
||||
from nova.consoleauth import rpcapi as consoleauth_rpcapi
|
||||
from nova import context
|
||||
from nova import exception
|
||||
from nova.i18n import _
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = nova.conf.CONF
|
||||
|
||||
|
||||
class NovaProxyRequestHandlerBase(object):
|
||||
def address_string(self):
|
||||
# NOTE(rpodolyaka): override the superclass implementation here and
|
||||
# explicitly disable the reverse DNS lookup, which might fail on some
|
||||
# deployments due to DNS configuration and break VNC access completely
|
||||
return str(self.client_address[0])
|
||||
|
||||
def verify_origin_proto(self, connection_info, origin_proto):
|
||||
access_url = connection_info.get('access_url')
|
||||
if not access_url:
|
||||
detail = _("No access_url in connection_info. "
|
||||
"Cannot validate protocol")
|
||||
raise exception.ValidationError(detail=detail)
|
||||
expected_protos = [urlparse.urlparse(access_url).scheme]
|
||||
# NOTE: For serial consoles the expected protocol could be ws or
|
||||
# wss which correspond to http and https respectively in terms of
|
||||
# security.
|
||||
if 'ws' in expected_protos:
|
||||
expected_protos.append('http')
|
||||
if 'wss' in expected_protos:
|
||||
expected_protos.append('https')
|
||||
|
||||
return origin_proto in expected_protos
|
||||
|
||||
def new_websocket_client(self):
|
||||
"""Called after a new WebSocket connection has been established."""
|
||||
# Reopen the eventlet hub to make sure we don't share an epoll
|
||||
# fd with parent and/or siblings, which would be bad
|
||||
from eventlet import hubs
|
||||
hubs.use_hub()
|
||||
|
||||
# The nova expected behavior is to have token
|
||||
# passed to the method GET of the request
|
||||
parse = urlparse.urlparse(self.path)
|
||||
if parse.scheme not in ('http', 'https'):
|
||||
# From a bug in urlparse in Python < 2.7.4 we cannot support
|
||||
# special schemes (cf: http://bugs.python.org/issue9374)
|
||||
if sys.version_info < (2, 7, 4):
|
||||
raise exception.NovaException(
|
||||
_("We do not support scheme '%s' under Python < 2.7.4, "
|
||||
"please use http or https") % parse.scheme)
|
||||
|
||||
query = parse.query
|
||||
token = urlparse.parse_qs(query).get("token", [""]).pop()
|
||||
if not token:
|
||||
# NoVNC uses it's own convention that forward token
|
||||
# from the request to a cookie header, we should check
|
||||
# also for this behavior
|
||||
hcookie = self.headers.get('cookie')
|
||||
if hcookie:
|
||||
cookie = Cookie.SimpleCookie()
|
||||
for hcookie_part in hcookie.split(';'):
|
||||
hcookie_part = hcookie_part.lstrip()
|
||||
try:
|
||||
cookie.load(hcookie_part)
|
||||
except Cookie.CookieError:
|
||||
# NOTE(stgleb): Do not print out cookie content
|
||||
# for security reasons.
|
||||
LOG.warning('Found malformed cookie')
|
||||
else:
|
||||
if 'token' in cookie:
|
||||
token = cookie['token'].value
|
||||
|
||||
ctxt = context.get_admin_context()
|
||||
rpcapi = consoleauth_rpcapi.ConsoleAuthAPI()
|
||||
connect_info = rpcapi.check_token(ctxt, token=token)
|
||||
|
||||
if not connect_info:
|
||||
raise exception.InvalidToken(token=token)
|
||||
|
||||
# Verify Origin
|
||||
expected_origin_hostname = self.headers.get('Host')
|
||||
if ':' in expected_origin_hostname:
|
||||
e = expected_origin_hostname
|
||||
if '[' in e and ']' in e:
|
||||
expected_origin_hostname = e.split(']')[0][1:]
|
||||
else:
|
||||
expected_origin_hostname = e.split(':')[0]
|
||||
expected_origin_hostnames = CONF.console.allowed_origins
|
||||
expected_origin_hostnames.append(expected_origin_hostname)
|
||||
origin_url = self.headers.get('Origin')
|
||||
# missing origin header indicates non-browser client which is OK
|
||||
if origin_url is not None:
|
||||
origin = urlparse.urlparse(origin_url)
|
||||
origin_hostname = origin.hostname
|
||||
origin_scheme = origin.scheme
|
||||
if origin_hostname == '' or origin_scheme == '':
|
||||
detail = _("Origin header not valid.")
|
||||
raise exception.ValidationError(detail=detail)
|
||||
if origin_hostname not in expected_origin_hostnames:
|
||||
detail = _("Origin header does not match this host.")
|
||||
raise exception.ValidationError(detail=detail)
|
||||
if not self.verify_origin_proto(connect_info, origin_scheme):
|
||||
detail = _("Origin header protocol does not match this host.")
|
||||
raise exception.ValidationError(detail=detail)
|
||||
|
||||
self.msg(_('connect info: %s'), str(connect_info))
|
||||
host = connect_info['host']
|
||||
port = int(connect_info['port'])
|
||||
|
||||
# Connect to the target
|
||||
self.msg(_("connecting to: %(host)s:%(port)s") % {'host': host,
|
||||
'port': port})
|
||||
tsock = self.socket(host, port, connect=True)
|
||||
|
||||
# Handshake as necessary
|
||||
if connect_info.get('internal_access_path'):
|
||||
tsock.send("CONNECT %s HTTP/1.1\r\n\r\n" %
|
||||
connect_info['internal_access_path'])
|
||||
end_token = "\r\n\r\n"
|
||||
while True:
|
||||
data = tsock.recv(4096, socket.MSG_PEEK)
|
||||
token_loc = data.find(end_token)
|
||||
if token_loc != -1:
|
||||
if data.split("\r\n")[0].find("200") == -1:
|
||||
raise exception.InvalidConnectionInfo()
|
||||
# remove the response from recv buffer
|
||||
tsock.recv(token_loc + len(end_token))
|
||||
break
|
||||
|
||||
# Start proxying
|
||||
try:
|
||||
self.do_proxy(tsock)
|
||||
except Exception:
|
||||
if tsock:
|
||||
tsock.shutdown(socket.SHUT_RDWR)
|
||||
tsock.close()
|
||||
self.vmsg(_("%(host)s:%(port)s: "
|
||||
"Websocket client or target closed") %
|
||||
{'host': host, 'port': port})
|
||||
raise
|
||||
|
||||
|
||||
class NovaProxyRequestHandler(NovaProxyRequestHandlerBase,
|
||||
websockify.ProxyRequestHandler):
|
||||
def __init__(self, *args, **kwargs):
|
||||
websockify.ProxyRequestHandler.__init__(self, *args, **kwargs)
|
||||
|
||||
def socket(self, *args, **kwargs):
|
||||
return websockify.WebSocketServer.socket(*args, **kwargs)
|
||||
|
||||
|
||||
class NovaWebSocketProxy(websockify.WebSocketProxy):
|
||||
@staticmethod
|
||||
def get_logger():
|
||||
return LOG
|
|
@ -0,0 +1,144 @@
|
|||
#!/usr/bin/env python
|
||||
# Copyright (c) 2012 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Auth Components for Consoles."""
|
||||
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from nova import cache_utils
|
||||
from nova.cells import rpcapi as cells_rpcapi
|
||||
from nova.compute import rpcapi as compute_rpcapi
|
||||
import nova.conf
|
||||
from nova import context as nova_context
|
||||
from nova import manager
|
||||
from nova import objects
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = nova.conf.CONF
|
||||
|
||||
|
||||
class ConsoleAuthManager(manager.Manager):
|
||||
"""Manages token based authentication."""
|
||||
|
||||
target = messaging.Target(version='2.1')
|
||||
|
||||
def __init__(self, scheduler_driver=None, *args, **kwargs):
|
||||
super(ConsoleAuthManager, self).__init__(service_name='consoleauth',
|
||||
*args, **kwargs)
|
||||
self._mc = None
|
||||
self._mc_instance = None
|
||||
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
|
||||
self.cells_rpcapi = cells_rpcapi.CellsAPI()
|
||||
|
||||
@property
|
||||
def mc(self):
|
||||
if self._mc is None:
|
||||
self._mc = cache_utils.get_client(CONF.consoleauth.token_ttl)
|
||||
return self._mc
|
||||
|
||||
@property
|
||||
def mc_instance(self):
|
||||
if self._mc_instance is None:
|
||||
self._mc_instance = cache_utils.get_client()
|
||||
return self._mc_instance
|
||||
|
||||
def reset(self):
|
||||
LOG.info('Reloading compute RPC API')
|
||||
compute_rpcapi.LAST_VERSION = None
|
||||
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
|
||||
|
||||
def _get_tokens_for_instance(self, instance_uuid):
|
||||
tokens_str = self.mc_instance.get(instance_uuid.encode('UTF-8'))
|
||||
if not tokens_str:
|
||||
tokens = []
|
||||
else:
|
||||
tokens = jsonutils.loads(tokens_str)
|
||||
return tokens
|
||||
|
||||
def authorize_console(self, context, token, console_type, host, port,
|
||||
internal_access_path, instance_uuid,
|
||||
access_url=None):
|
||||
|
||||
token_dict = {'token': token,
|
||||
'instance_uuid': instance_uuid,
|
||||
'console_type': console_type,
|
||||
'host': host,
|
||||
'port': port,
|
||||
'internal_access_path': internal_access_path,
|
||||
'access_url': access_url,
|
||||
'last_activity_at': time.time()}
|
||||
data = jsonutils.dumps(token_dict)
|
||||
|
||||
self.mc.set(token.encode('UTF-8'), data)
|
||||
tokens = self._get_tokens_for_instance(instance_uuid)
|
||||
|
||||
# Remove the expired tokens from cache.
|
||||
token_values = self.mc.get_multi(
|
||||
[tok.encode('UTF-8') for tok in tokens])
|
||||
tokens = [name for name, value in zip(tokens, token_values)
|
||||
if value is not None]
|
||||
tokens.append(token)
|
||||
|
||||
self.mc_instance.set(instance_uuid.encode('UTF-8'),
|
||||
jsonutils.dumps(tokens))
|
||||
|
||||
LOG.info("Received Token: %(token)s, %(token_dict)s",
|
||||
{'token': token, 'token_dict': token_dict})
|
||||
|
||||
def _validate_token(self, context, token):
|
||||
instance_uuid = token['instance_uuid']
|
||||
if instance_uuid is None:
|
||||
return False
|
||||
|
||||
# NOTE(comstud): consoleauth was meant to run in API cells. So,
|
||||
# if cells is enabled, we must call down to the child cell for
|
||||
# the instance.
|
||||
if CONF.cells.enable:
|
||||
return self.cells_rpcapi.validate_console_port(context,
|
||||
instance_uuid, token['port'], token['console_type'])
|
||||
|
||||
mapping = objects.InstanceMapping.get_by_instance_uuid(context,
|
||||
instance_uuid)
|
||||
with nova_context.target_cell(context, mapping.cell_mapping) as cctxt:
|
||||
instance = objects.Instance.get_by_uuid(cctxt, instance_uuid)
|
||||
|
||||
return self.compute_rpcapi.validate_console_port(
|
||||
cctxt,
|
||||
instance,
|
||||
token['port'],
|
||||
token['console_type'])
|
||||
|
||||
def check_token(self, context, token):
|
||||
token_str = self.mc.get(token.encode('UTF-8'))
|
||||
token_valid = (token_str is not None)
|
||||
LOG.info("Checking Token: %(token)s, %(token_valid)s",
|
||||
{'token': token, 'token_valid': token_valid})
|
||||
if token_valid:
|
||||
token = jsonutils.loads(token_str)
|
||||
if self._validate_token(context, token):
|
||||
return token
|
||||
|
||||
def delete_tokens_for_instance(self, context, instance_uuid):
|
||||
tokens = self._get_tokens_for_instance(instance_uuid)
|
||||
self.mc.delete_multi(
|
||||
[tok.encode('UTF-8') for tok in tokens])
|
||||
self.mc_instance.delete(instance_uuid.encode('UTF-8'))
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,617 @@
|
|||
# Copyright 2013 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
from sqlalchemy.orm import contains_eager
|
||||
from sqlalchemy.orm import joinedload
|
||||
from sqlalchemy.sql import func
|
||||
from sqlalchemy.sql import text
|
||||
|
||||
from nova.compute import utils as compute_utils
|
||||
from nova import db
|
||||
from nova.db.sqlalchemy import api as db_api
|
||||
from nova.db.sqlalchemy import api_models
|
||||
from nova.db.sqlalchemy import models as main_models
|
||||
from nova import exception
|
||||
from nova.i18n import _
|
||||
from nova import objects
|
||||
from nova.objects import base
|
||||
from nova.objects import fields
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
DEPRECATED_FIELDS = ['deleted', 'deleted_at']
|
||||
|
||||
|
||||
@db_api.api_context_manager.reader
|
||||
def _aggregate_get_from_db(context, aggregate_id):
|
||||
query = context.session.query(api_models.Aggregate).\
|
||||
options(joinedload('_hosts')).\
|
||||
options(joinedload('_metadata'))
|
||||
query = query.filter(api_models.Aggregate.id == aggregate_id)
|
||||
|
||||
aggregate = query.first()
|
||||
|
||||
if not aggregate:
|
||||
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
|
||||
|
||||
return aggregate
|
||||
|
||||
|
||||
@db_api.api_context_manager.reader
|
||||
def _aggregate_get_from_db_by_uuid(context, aggregate_uuid):
|
||||
query = context.session.query(api_models.Aggregate).\
|
||||
options(joinedload('_hosts')).\
|
||||
options(joinedload('_metadata'))
|
||||
query = query.filter(api_models.Aggregate.uuid == aggregate_uuid)
|
||||
|
||||
aggregate = query.first()
|
||||
|
||||
if not aggregate:
|
||||
raise exception.AggregateNotFound(aggregate_id=aggregate_uuid)
|
||||
|
||||
return aggregate
|
||||
|
||||
|
||||
def _host_add_to_db(context, aggregate_id, host):
|
||||
try:
|
||||
with db_api.api_context_manager.writer.using(context):
|
||||
# Check to see if the aggregate exists
|
||||
_aggregate_get_from_db(context, aggregate_id)
|
||||
|
||||
host_ref = api_models.AggregateHost()
|
||||
host_ref.update({"host": host, "aggregate_id": aggregate_id})
|
||||
host_ref.save(context.session)
|
||||
return host_ref
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.AggregateHostExists(host=host,
|
||||
aggregate_id=aggregate_id)
|
||||
|
||||
|
||||
def _host_delete_from_db(context, aggregate_id, host):
|
||||
count = 0
|
||||
with db_api.api_context_manager.writer.using(context):
|
||||
# Check to see if the aggregate exists
|
||||
_aggregate_get_from_db(context, aggregate_id)
|
||||
|
||||
query = context.session.query(api_models.AggregateHost)
|
||||
query = query.filter(api_models.AggregateHost.aggregate_id ==
|
||||
aggregate_id)
|
||||
count = query.filter_by(host=host).delete()
|
||||
|
||||
if count == 0:
|
||||
raise exception.AggregateHostNotFound(aggregate_id=aggregate_id,
|
||||
host=host)
|
||||
|
||||
|
||||
def _metadata_add_to_db(context, aggregate_id, metadata, max_retries=10,
|
||||
set_delete=False):
|
||||
all_keys = metadata.keys()
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
with db_api.api_context_manager.writer.using(context):
|
||||
query = context.session.query(api_models.AggregateMetadata).\
|
||||
filter_by(aggregate_id=aggregate_id)
|
||||
|
||||
if set_delete:
|
||||
query.filter(~api_models.AggregateMetadata.key.
|
||||
in_(all_keys)).\
|
||||
delete(synchronize_session=False)
|
||||
|
||||
already_existing_keys = set()
|
||||
if all_keys:
|
||||
query = query.filter(
|
||||
api_models.AggregateMetadata.key.in_(all_keys))
|
||||
for meta_ref in query.all():
|
||||
key = meta_ref.key
|
||||
meta_ref.update({"value": metadata[key]})
|
||||
already_existing_keys.add(key)
|
||||
|
||||
new_entries = []
|
||||
for key, value in metadata.items():
|
||||
if key in already_existing_keys:
|
||||
continue
|
||||
new_entries.append({"key": key,
|
||||
"value": value,
|
||||
"aggregate_id": aggregate_id})
|
||||
if new_entries:
|
||||
context.session.execute(
|
||||
api_models.AggregateMetadata.__table__.insert(),
|
||||
new_entries)
|
||||
|
||||
return metadata
|
||||
except db_exc.DBDuplicateEntry:
|
||||
# a concurrent transaction has been committed,
|
||||
# try again unless this was the last attempt
|
||||
with excutils.save_and_reraise_exception() as ctxt:
|
||||
if attempt < max_retries - 1:
|
||||
ctxt.reraise = False
|
||||
else:
|
||||
msg = _("Add metadata failed for aggregate %(id)s "
|
||||
"after %(retries)s retries") % \
|
||||
{"id": aggregate_id, "retries": max_retries}
|
||||
LOG.warning(msg)
|
||||
|
||||
|
||||
@db_api.api_context_manager.writer
|
||||
def _metadata_delete_from_db(context, aggregate_id, key):
|
||||
# Check to see if the aggregate exists
|
||||
_aggregate_get_from_db(context, aggregate_id)
|
||||
|
||||
query = context.session.query(api_models.AggregateMetadata)
|
||||
query = query.filter(api_models.AggregateMetadata.aggregate_id ==
|
||||
aggregate_id)
|
||||
count = query.filter_by(key=key).delete()
|
||||
|
||||
if count == 0:
|
||||
raise exception.AggregateMetadataNotFound(
|
||||
aggregate_id=aggregate_id, metadata_key=key)
|
||||
|
||||
|
||||
@db_api.api_context_manager.writer
|
||||
def _aggregate_create_in_db(context, values, metadata=None):
|
||||
query = context.session.query(api_models.Aggregate)
|
||||
query = query.filter(api_models.Aggregate.name == values['name'])
|
||||
aggregate = query.first()
|
||||
|
||||
if not aggregate:
|
||||
aggregate = api_models.Aggregate()
|
||||
aggregate.update(values)
|
||||
aggregate.save(context.session)
|
||||
# We don't want these to be lazy loaded later. We know there is
|
||||
# nothing here since we just created this aggregate.
|
||||
aggregate._hosts = []
|
||||
aggregate._metadata = []
|
||||
else:
|
||||
raise exception.AggregateNameExists(aggregate_name=values['name'])
|
||||
if metadata:
|
||||
_metadata_add_to_db(context, aggregate.id, metadata)
|
||||
context.session.expire(aggregate, ['_metadata'])
|
||||
aggregate._metadata
|
||||
|
||||
return aggregate
|
||||
|
||||
|
||||
@db_api.api_context_manager.writer
|
||||
def _aggregate_delete_from_db(context, aggregate_id):
|
||||
# Delete Metadata first
|
||||
context.session.query(api_models.AggregateMetadata).\
|
||||
filter_by(aggregate_id=aggregate_id).\
|
||||
delete()
|
||||
|
||||
count = context.session.query(api_models.Aggregate).\
|
||||
filter(api_models.Aggregate.id == aggregate_id).\
|
||||
delete()
|
||||
|
||||
if count == 0:
|
||||
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
|
||||
|
||||
|
||||
@db_api.api_context_manager.writer
|
||||
def _aggregate_update_to_db(context, aggregate_id, values):
|
||||
aggregate = _aggregate_get_from_db(context, aggregate_id)
|
||||
|
||||
set_delete = True
|
||||
if "availability_zone" in values:
|
||||
az = values.pop('availability_zone')
|
||||
if 'metadata' not in values:
|
||||
values['metadata'] = {'availability_zone': az}
|
||||
set_delete = False
|
||||
else:
|
||||
values['metadata']['availability_zone'] = az
|
||||
metadata = values.get('metadata')
|
||||
if metadata is not None:
|
||||
_metadata_add_to_db(context, aggregate_id, values.pop('metadata'),
|
||||
set_delete=set_delete)
|
||||
|
||||
aggregate.update(values)
|
||||
try:
|
||||
aggregate.save(context.session)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
if 'name' in values:
|
||||
raise exception.AggregateNameExists(
|
||||
aggregate_name=values['name'])
|
||||
else:
|
||||
raise
|
||||
return _aggregate_get_from_db(context, aggregate_id)
|
||||
|
||||
|
||||
@base.NovaObjectRegistry.register
|
||||
class Aggregate(base.NovaPersistentObject, base.NovaObject):
|
||||
# Version 1.0: Initial version
|
||||
# Version 1.1: String attributes updated to support unicode
|
||||
# Version 1.2: Added uuid field
|
||||
# Version 1.3: Added get_by_uuid method
|
||||
VERSION = '1.3'
|
||||
|
||||
fields = {
|
||||
'id': fields.IntegerField(),
|
||||
'uuid': fields.UUIDField(nullable=False),
|
||||
'name': fields.StringField(),
|
||||
'hosts': fields.ListOfStringsField(nullable=True),
|
||||
'metadata': fields.DictOfStringsField(nullable=True),
|
||||
}
|
||||
|
||||
obj_extra_fields = ['availability_zone']
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Aggregate, self).__init__(*args, **kwargs)
|
||||
self._in_api = False
|
||||
|
||||
@staticmethod
|
||||
def _from_db_object(context, aggregate, db_aggregate):
|
||||
for key in aggregate.fields:
|
||||
if key == 'metadata':
|
||||
db_key = 'metadetails'
|
||||
elif key in DEPRECATED_FIELDS and key not in db_aggregate:
|
||||
continue
|
||||
else:
|
||||
db_key = key
|
||||
setattr(aggregate, key, db_aggregate[db_key])
|
||||
|
||||
# NOTE: This can be removed when we remove compatibility with
|
||||
# the old aggregate model.
|
||||
if any(f not in db_aggregate for f in DEPRECATED_FIELDS):
|
||||
aggregate.deleted_at = None
|
||||
aggregate.deleted = False
|
||||
|
||||
aggregate._context = context
|
||||
aggregate.obj_reset_changes()
|
||||
|
||||
return aggregate
|
||||
|
||||
def _assert_no_hosts(self, action):
|
||||
if 'hosts' in self.obj_what_changed():
|
||||
raise exception.ObjectActionError(
|
||||
action=action,
|
||||
reason='hosts updated inline')
|
||||
|
||||
@property
|
||||
def in_api(self):
|
||||
if self._in_api:
|
||||
return True
|
||||
else:
|
||||
try:
|
||||
_aggregate_get_from_db(self._context, self.id)
|
||||
self._in_api = True
|
||||
except exception.AggregateNotFound:
|
||||
pass
|
||||
return self._in_api
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_id(cls, context, aggregate_id):
|
||||
try:
|
||||
db_aggregate = _aggregate_get_from_db(context, aggregate_id)
|
||||
except exception.AggregateNotFound:
|
||||
db_aggregate = db.aggregate_get(context, aggregate_id)
|
||||
return cls._from_db_object(context, cls(), db_aggregate)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_uuid(cls, context, aggregate_uuid):
|
||||
try:
|
||||
db_aggregate = _aggregate_get_from_db_by_uuid(context,
|
||||
aggregate_uuid)
|
||||
except exception.AggregateNotFound:
|
||||
db_aggregate = db.aggregate_get_by_uuid(context, aggregate_uuid)
|
||||
return cls._from_db_object(context, cls(), db_aggregate)
|
||||
|
||||
@staticmethod
|
||||
@db_api.pick_context_manager_reader
|
||||
def _ensure_migrated(context):
|
||||
result = context.session.query(main_models.Aggregate).\
|
||||
filter_by(deleted=0).count()
|
||||
if result:
|
||||
LOG.warning(
|
||||
'Main database contains %(count)i unmigrated aggregates',
|
||||
{'count': result})
|
||||
return result == 0
|
||||
|
||||
@base.remotable
|
||||
def create(self):
|
||||
if self.obj_attr_is_set('id'):
|
||||
raise exception.ObjectActionError(action='create',
|
||||
reason='already created')
|
||||
|
||||
# NOTE(mdoff): Once we have made it past a point where we know
|
||||
# all aggregates have been migrated, we can remove this. Ideally
|
||||
# in Ocata with a blocker migration to be sure.
|
||||
if not self._ensure_migrated(self._context):
|
||||
raise exception.ObjectActionError(
|
||||
action='create',
|
||||
reason='main database still contains aggregates')
|
||||
|
||||
self._assert_no_hosts('create')
|
||||
updates = self.obj_get_changes()
|
||||
payload = dict(updates)
|
||||
if 'metadata' in updates:
|
||||
# NOTE(danms): For some reason the notification format is weird
|
||||
payload['meta_data'] = payload.pop('metadata')
|
||||
if 'uuid' not in updates:
|
||||
updates['uuid'] = uuidutils.generate_uuid()
|
||||
self.uuid = updates['uuid']
|
||||
LOG.debug('Generated uuid %(uuid)s for aggregate',
|
||||
dict(uuid=updates['uuid']))
|
||||
compute_utils.notify_about_aggregate_update(self._context,
|
||||
"create.start",
|
||||
payload)
|
||||
compute_utils.notify_about_aggregate_action(
|
||||
context=self._context,
|
||||
aggregate=self,
|
||||
action=fields.NotificationAction.CREATE,
|
||||
phase=fields.NotificationPhase.START)
|
||||
|
||||
metadata = updates.pop('metadata', None)
|
||||
db_aggregate = _aggregate_create_in_db(self._context, updates,
|
||||
metadata=metadata)
|
||||
self._from_db_object(self._context, self, db_aggregate)
|
||||
payload['aggregate_id'] = self.id
|
||||
compute_utils.notify_about_aggregate_update(self._context,
|
||||
"create.end",
|
||||
payload)
|
||||
compute_utils.notify_about_aggregate_action(
|
||||
context=self._context,
|
||||
aggregate=self,
|
||||
action=fields.NotificationAction.CREATE,
|
||||
phase=fields.NotificationPhase.END)
|
||||
|
||||
@base.remotable
|
||||
def save(self):
|
||||
self._assert_no_hosts('save')
|
||||
updates = self.obj_get_changes()
|
||||
|
||||
payload = {'aggregate_id': self.id}
|
||||
if 'metadata' in updates:
|
||||
payload['meta_data'] = updates['metadata']
|
||||
compute_utils.notify_about_aggregate_update(self._context,
|
||||
"updateprop.start",
|
||||
payload)
|
||||
updates.pop('id', None)
|
||||
try:
|
||||
db_aggregate = _aggregate_update_to_db(self._context,
|
||||
self.id, updates)
|
||||
except exception.AggregateNotFound:
|
||||
db_aggregate = db.aggregate_update(self._context, self.id, updates)
|
||||
|
||||
compute_utils.notify_about_aggregate_update(self._context,
|
||||
"updateprop.end",
|
||||
payload)
|
||||
self._from_db_object(self._context, self, db_aggregate)
|
||||
|
||||
@base.remotable
|
||||
def update_metadata(self, updates):
|
||||
if self.in_api:
|
||||
metadata_delete = _metadata_delete_from_db
|
||||
metadata_add = _metadata_add_to_db
|
||||
else:
|
||||
metadata_delete = db.aggregate_metadata_delete
|
||||
metadata_add = db.aggregate_metadata_add
|
||||
|
||||
payload = {'aggregate_id': self.id,
|
||||
'meta_data': updates}
|
||||
compute_utils.notify_about_aggregate_update(self._context,
|
||||
"updatemetadata.start",
|
||||
payload)
|
||||
to_add = {}
|
||||
for key, value in updates.items():
|
||||
if value is None:
|
||||
try:
|
||||
metadata_delete(self._context, self.id, key)
|
||||
except exception.AggregateMetadataNotFound:
|
||||
pass
|
||||
try:
|
||||
self.metadata.pop(key)
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
to_add[key] = value
|
||||
self.metadata[key] = value
|
||||
metadata_add(self._context, self.id, to_add)
|
||||
compute_utils.notify_about_aggregate_update(self._context,
|
||||
"updatemetadata.end",
|
||||
payload)
|
||||
self.obj_reset_changes(fields=['metadata'])
|
||||
|
||||
@base.remotable
|
||||
def destroy(self):
|
||||
try:
|
||||
_aggregate_delete_from_db(self._context, self.id)
|
||||
except exception.AggregateNotFound:
|
||||
db.aggregate_delete(self._context, self.id)
|
||||
|
||||
@base.remotable
|
||||
def add_host(self, host):
|
||||
if self.in_api:
|
||||
_host_add_to_db(self._context, self.id, host)
|
||||
else:
|
||||
db.aggregate_host_add(self._context, self.id, host)
|
||||
|
||||
if self.hosts is None:
|
||||
self.hosts = []
|
||||
self.hosts.append(host)
|
||||
self.obj_reset_changes(fields=['hosts'])
|
||||
|
||||
@base.remotable
|
||||
def delete_host(self, host):
|
||||
if self.in_api:
|
||||
_host_delete_from_db(self._context, self.id, host)
|
||||
else:
|
||||
db.aggregate_host_delete(self._context, self.id, host)
|
||||
|
||||
self.hosts.remove(host)
|
||||
self.obj_reset_changes(fields=['hosts'])
|
||||
|
||||
@property
|
||||
def availability_zone(self):
|
||||
return self.metadata.get('availability_zone', None)
|
||||
|
||||
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_all_from_db(context):
|
||||
query = context.session.query(api_models.Aggregate).\
|
||||
options(joinedload('_hosts')).\
|
||||
options(joinedload('_metadata'))
|
||||
|
||||
return query.all()
|
||||
|
||||
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_by_host_from_db(context, host, key=None):
|
||||
query = context.session.query(api_models.Aggregate).\
|
||||
options(joinedload('_hosts')).\
|
||||
options(joinedload('_metadata'))
|
||||
query = query.join('_hosts')
|
||||
query = query.filter(api_models.AggregateHost.host == host)
|
||||
|
||||
if key:
|
||||
query = query.join("_metadata").filter(
|
||||
api_models.AggregateMetadata.key == key)
|
||||
|
||||
return query.all()
|
||||
|
||||
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_by_metadata_key_from_db(context, key):
|
||||
query = context.session.query(api_models.Aggregate)
|
||||
query = query.join("_metadata")
|
||||
query = query.filter(api_models.AggregateMetadata.key == key)
|
||||
query = query.options(contains_eager("_metadata"))
|
||||
query = query.options(joinedload("_hosts"))
|
||||
|
||||
return query.all()
|
||||
|
||||
|
||||
@base.NovaObjectRegistry.register
|
||||
class AggregateList(base.ObjectListBase, base.NovaObject):
|
||||
# Version 1.0: Initial version
|
||||
# Version 1.1: Added key argument to get_by_host()
|
||||
# Aggregate <= version 1.1
|
||||
# Version 1.2: Added get_by_metadata_key
|
||||
VERSION = '1.2'
|
||||
|
||||
fields = {
|
||||
'objects': fields.ListOfObjectsField('Aggregate'),
|
||||
}
|
||||
|
||||
# NOTE(mdoff): Calls to this can be removed when we remove
|
||||
# compatibility with the old aggregate model.
|
||||
@staticmethod
|
||||
def _fill_deprecated(db_aggregate):
|
||||
db_aggregate['deleted_at'] = None
|
||||
db_aggregate['deleted'] = False
|
||||
return db_aggregate
|
||||
|
||||
@classmethod
|
||||
def _filter_db_aggregates(cls, db_aggregates, hosts):
|
||||
if not isinstance(hosts, set):
|
||||
hosts = set(hosts)
|
||||
filtered_aggregates = []
|
||||
for db_aggregate in db_aggregates:
|
||||
for host in db_aggregate['hosts']:
|
||||
if host in hosts:
|
||||
filtered_aggregates.append(db_aggregate)
|
||||
break
|
||||
return filtered_aggregates
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_all(cls, context):
|
||||
api_db_aggregates = [cls._fill_deprecated(agg) for agg in
|
||||
_get_all_from_db(context)]
|
||||
db_aggregates = db.aggregate_get_all(context)
|
||||
return base.obj_make_list(context, cls(context), objects.Aggregate,
|
||||
db_aggregates + api_db_aggregates)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_host(cls, context, host, key=None):
|
||||
api_db_aggregates = [cls._fill_deprecated(agg) for agg in
|
||||
_get_by_host_from_db(context, host, key=key)]
|
||||
db_aggregates = db.aggregate_get_by_host(context, host, key=key)
|
||||
return base.obj_make_list(context, cls(context), objects.Aggregate,
|
||||
db_aggregates + api_db_aggregates)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_metadata_key(cls, context, key, hosts=None):
|
||||
api_db_aggregates = [cls._fill_deprecated(agg) for agg in
|
||||
_get_by_metadata_key_from_db(context, key=key)]
|
||||
db_aggregates = db.aggregate_get_by_metadata_key(context, key=key)
|
||||
|
||||
all_aggregates = db_aggregates + api_db_aggregates
|
||||
if hosts is not None:
|
||||
all_aggregates = cls._filter_db_aggregates(all_aggregates, hosts)
|
||||
return base.obj_make_list(context, cls(context), objects.Aggregate,
|
||||
all_aggregates)
|
||||
|
||||
|
||||
@db_api.pick_context_manager_reader
|
||||
def _get_main_db_aggregate_ids(context, limit):
|
||||
from nova.db.sqlalchemy import models
|
||||
return [x[0] for x in context.session.query(models.Aggregate.id).
|
||||
filter_by(deleted=0).
|
||||
limit(limit)]
|
||||
|
||||
|
||||
def migrate_aggregates(ctxt, count):
|
||||
main_db_ids = _get_main_db_aggregate_ids(ctxt, count)
|
||||
if not main_db_ids:
|
||||
return 0, 0
|
||||
|
||||
count_all = len(main_db_ids)
|
||||
count_hit = 0
|
||||
|
||||
for aggregate_id in main_db_ids:
|
||||
try:
|
||||
aggregate = Aggregate.get_by_id(ctxt, aggregate_id)
|
||||
remove = ['metadata', 'hosts']
|
||||
values = {field: getattr(aggregate, field)
|
||||
for field in aggregate.fields if field not in remove}
|
||||
_aggregate_create_in_db(ctxt, values, metadata=aggregate.metadata)
|
||||
for host in aggregate.hosts:
|
||||
_host_add_to_db(ctxt, aggregate_id, host)
|
||||
count_hit += 1
|
||||
db.aggregate_delete(ctxt, aggregate.id)
|
||||
except exception.AggregateNotFound:
|
||||
LOG.warning(
|
||||
'Aggregate id %(id)i disappeared during migration',
|
||||
{'id': aggregate_id})
|
||||
except (exception.AggregateNameExists) as e:
|
||||
LOG.error(six.text_type(e))
|
||||
|
||||
return count_all, count_hit
|
||||
|
||||
|
||||
def _adjust_autoincrement(context, value):
|
||||
engine = db_api.get_api_engine()
|
||||
if engine.name == 'postgresql':
|
||||
# NOTE(danms): If we migrated some aggregates in the above function,
|
||||
# then we will have confused postgres' sequence for the autoincrement
|
||||
# primary key. MySQL does not care about this, but since postgres does,
|
||||
# we need to reset this to avoid a failure on the next aggregate
|
||||
# creation.
|
||||
engine.execute(
|
||||
text('ALTER SEQUENCE aggregates_id_seq RESTART WITH %i;' % (
|
||||
value)))
|
||||
|
||||
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_max_aggregate_id(context):
|
||||
return context.session.query(func.max(api_models.Aggregate.id)).one()[0]
|
||||
|
||||
|
||||
def migrate_aggregate_reset_autoincrement(ctxt, count):
|
||||
max_id = _get_max_aggregate_id(ctxt) or 0
|
||||
_adjust_autoincrement(ctxt, max_id + 1)
|
||||
return 0, 0
|
|
@ -0,0 +1,120 @@
|
|||
# Copyright 2013 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from nova.cells import opts as cells_opts
|
||||
from nova.cells import rpcapi as cells_rpcapi
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova import objects
|
||||
from nova.objects import base
|
||||
from nova.objects import fields
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# TODO(berrange): Remove NovaObjectDictCompat
|
||||
@base.NovaObjectRegistry.register
|
||||
class InstanceFault(base.NovaPersistentObject, base.NovaObject,
|
||||
base.NovaObjectDictCompat):
|
||||
# Version 1.0: Initial version
|
||||
# Version 1.1: String attributes updated to support unicode
|
||||
# Version 1.2: Added create()
|
||||
VERSION = '1.2'
|
||||
|
||||
fields = {
|
||||
'id': fields.IntegerField(),
|
||||
'instance_uuid': fields.UUIDField(),
|
||||
'code': fields.IntegerField(),
|
||||
'message': fields.StringField(nullable=True),
|
||||
'details': fields.StringField(nullable=True),
|
||||
'host': fields.StringField(nullable=True),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _from_db_object(context, fault, db_fault):
|
||||
# NOTE(danms): These are identical right now
|
||||
for key in fault.fields:
|
||||
fault[key] = db_fault[key]
|
||||
fault._context = context
|
||||
fault.obj_reset_changes()
|
||||
return fault
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_latest_for_instance(cls, context, instance_uuid):
|
||||
db_faults = db.instance_fault_get_by_instance_uuids(context,
|
||||
[instance_uuid])
|
||||
if instance_uuid in db_faults and db_faults[instance_uuid]:
|
||||
return cls._from_db_object(context, cls(),
|
||||
db_faults[instance_uuid][0])
|
||||
|
||||
@base.remotable
|
||||
def create(self):
|
||||
if self.obj_attr_is_set('id'):
|
||||
raise exception.ObjectActionError(action='create',
|
||||
reason='already created')
|
||||
values = {
|
||||
'instance_uuid': self.instance_uuid,
|
||||
'code': self.code,
|
||||
'message': self.message,
|
||||
'details': self.details,
|
||||
'host': self.host,
|
||||
}
|
||||
db_fault = db.instance_fault_create(self._context, values)
|
||||
self._from_db_object(self._context, self, db_fault)
|
||||
self.obj_reset_changes()
|
||||
# Cells should only try sending a message over to nova-cells
|
||||
# if cells is enabled and we're not the API cell. Otherwise,
|
||||
# if the API cell is calling this, we could end up with
|
||||
# infinite recursion.
|
||||
if cells_opts.get_cell_type() == 'compute':
|
||||
try:
|
||||
cells_rpcapi.CellsAPI().instance_fault_create_at_top(
|
||||
self._context, db_fault)
|
||||
except Exception:
|
||||
LOG.exception("Failed to notify cells of instance fault")
|
||||
|
||||
|
||||
@base.NovaObjectRegistry.register
|
||||
class InstanceFaultList(base.ObjectListBase, base.NovaObject):
|
||||
# Version 1.0: Initial version
|
||||
# InstanceFault <= version 1.1
|
||||
# Version 1.1: InstanceFault version 1.2
|
||||
# Version 1.2: Added get_latest_by_instance_uuids() method
|
||||
VERSION = '1.2'
|
||||
|
||||
fields = {
|
||||
'objects': fields.ListOfObjectsField('InstanceFault'),
|
||||
}
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_latest_by_instance_uuids(cls, context, instance_uuids):
|
||||
db_faultdict = db.instance_fault_get_by_instance_uuids(context,
|
||||
instance_uuids,
|
||||
latest=True)
|
||||
db_faultlist = itertools.chain(*db_faultdict.values())
|
||||
return base.obj_make_list(context, cls(context), objects.InstanceFault,
|
||||
db_faultlist)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_instance_uuids(cls, context, instance_uuids):
|
||||
db_faultdict = db.instance_fault_get_by_instance_uuids(context,
|
||||
instance_uuids)
|
||||
db_faultlist = itertools.chain(*db_faultdict.values())
|
||||
return base.obj_make_list(context, cls(context), objects.InstanceFault,
|
||||
db_faultlist)
|
|
@ -0,0 +1,592 @@
|
|||
# Copyright (c) 2013 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_utils import uuidutils
|
||||
from oslo_utils import versionutils
|
||||
from sqlalchemy.orm import contains_eager
|
||||
from sqlalchemy.orm import joinedload
|
||||
|
||||
from nova.compute import utils as compute_utils
|
||||
from nova import db
|
||||
from nova.db.sqlalchemy import api as db_api
|
||||
from nova.db.sqlalchemy import api_models
|
||||
from nova.db.sqlalchemy import models as main_models
|
||||
from nova import exception
|
||||
from nova import objects
|
||||
from nova.objects import base
|
||||
from nova.objects import fields
|
||||
|
||||
|
||||
LAZY_LOAD_FIELDS = ['hosts']
|
||||
|
||||
|
||||
def _instance_group_get_query(context, id_field=None, id=None):
|
||||
query = context.session.query(api_models.InstanceGroup).\
|
||||
options(joinedload('_policies')).\
|
||||
options(joinedload('_members'))
|
||||
if not context.is_admin:
|
||||
query = query.filter_by(project_id=context.project_id)
|
||||
if id and id_field:
|
||||
query = query.filter(id_field == id)
|
||||
return query
|
||||
|
||||
|
||||
def _instance_group_model_get_query(context, model_class, group_id):
|
||||
return context.session.query(model_class).filter_by(group_id=group_id)
|
||||
|
||||
|
||||
def _instance_group_model_add(context, model_class, items, item_models, field,
|
||||
group_id, append_to_models=None):
|
||||
models = []
|
||||
already_existing = set()
|
||||
for db_item in item_models:
|
||||
already_existing.add(getattr(db_item, field))
|
||||
models.append(db_item)
|
||||
for item in items:
|
||||
if item in already_existing:
|
||||
continue
|
||||
model = model_class()
|
||||
values = {'group_id': group_id}
|
||||
values[field] = item
|
||||
model.update(values)
|
||||
context.session.add(model)
|
||||
if append_to_models:
|
||||
append_to_models.append(model)
|
||||
models.append(model)
|
||||
return models
|
||||
|
||||
|
||||
def _instance_group_policies_add(context, group, policies):
|
||||
query = _instance_group_model_get_query(context,
|
||||
api_models.InstanceGroupPolicy,
|
||||
group.id)
|
||||
query = query.filter(
|
||||
api_models.InstanceGroupPolicy.policy.in_(set(policies)))
|
||||
return _instance_group_model_add(context, api_models.InstanceGroupPolicy,
|
||||
policies, query.all(), 'policy', group.id,
|
||||
append_to_models=group._policies)
|
||||
|
||||
|
||||
def _instance_group_members_add(context, group, members):
|
||||
query = _instance_group_model_get_query(context,
|
||||
api_models.InstanceGroupMember,
|
||||
group.id)
|
||||
query = query.filter(
|
||||
api_models.InstanceGroupMember.instance_uuid.in_(set(members)))
|
||||
return _instance_group_model_add(context, api_models.InstanceGroupMember,
|
||||
members, query.all(), 'instance_uuid',
|
||||
group.id, append_to_models=group._members)
|
||||
|
||||
|
||||
def _instance_group_members_add_by_uuid(context, group_uuid, members):
|
||||
# NOTE(melwitt): The condition on the join limits the number of members
|
||||
# returned to only those we wish to check as already existing.
|
||||
group = context.session.query(api_models.InstanceGroup).\
|
||||
outerjoin(api_models.InstanceGroupMember,
|
||||
api_models.InstanceGroupMember.instance_uuid.in_(set(members))).\
|
||||
filter(api_models.InstanceGroup.uuid == group_uuid).\
|
||||
options(contains_eager('_members')).first()
|
||||
if not group:
|
||||
raise exception.InstanceGroupNotFound(group_uuid=group_uuid)
|
||||
return _instance_group_model_add(context, api_models.InstanceGroupMember,
|
||||
members, group._members, 'instance_uuid',
|
||||
group.id)
|
||||
|
||||
|
||||
# TODO(berrange): Remove NovaObjectDictCompat
|
||||
@base.NovaObjectRegistry.register
|
||||
class InstanceGroup(base.NovaPersistentObject, base.NovaObject,
|
||||
base.NovaObjectDictCompat):
|
||||
# Version 1.0: Initial version
|
||||
# Version 1.1: String attributes updated to support unicode
|
||||
# Version 1.2: Use list/dict helpers for policies, metadetails, members
|
||||
# Version 1.3: Make uuid a non-None real string
|
||||
# Version 1.4: Add add_members()
|
||||
# Version 1.5: Add get_hosts()
|
||||
# Version 1.6: Add get_by_name()
|
||||
# Version 1.7: Deprecate metadetails
|
||||
# Version 1.8: Add count_members_by_user()
|
||||
# Version 1.9: Add get_by_instance_uuid()
|
||||
# Version 1.10: Add hosts field
|
||||
VERSION = '1.10'
|
||||
|
||||
fields = {
|
||||
'id': fields.IntegerField(),
|
||||
|
||||
'user_id': fields.StringField(nullable=True),
|
||||
'project_id': fields.StringField(nullable=True),
|
||||
|
||||
'uuid': fields.UUIDField(),
|
||||
'name': fields.StringField(nullable=True),
|
||||
|
||||
'policies': fields.ListOfStringsField(nullable=True),
|
||||
'members': fields.ListOfStringsField(nullable=True),
|
||||
'hosts': fields.ListOfStringsField(nullable=True),
|
||||
}
|
||||
|
||||
def obj_make_compatible(self, primitive, target_version):
|
||||
target_version = versionutils.convert_version_to_tuple(target_version)
|
||||
if target_version < (1, 7):
|
||||
# NOTE(danms): Before 1.7, we had an always-empty
|
||||
# metadetails property
|
||||
primitive['metadetails'] = {}
|
||||
|
||||
@staticmethod
|
||||
def _from_db_object(context, instance_group, db_inst):
|
||||
"""Method to help with migration to objects.
|
||||
|
||||
Converts a database entity to a formal object.
|
||||
"""
|
||||
# Most of the field names match right now, so be quick
|
||||
for field in instance_group.fields:
|
||||
if field in LAZY_LOAD_FIELDS:
|
||||
continue
|
||||
# This is needed to handle db models from both the api
|
||||
# database and the main database. In the migration to
|
||||
# the api database, we have removed soft-delete, so
|
||||
# the object fields for delete must be filled in with
|
||||
# default values for db models from the api database.
|
||||
ignore = {'deleted': False,
|
||||
'deleted_at': None}
|
||||
if field in ignore and not hasattr(db_inst, field):
|
||||
instance_group[field] = ignore[field]
|
||||
else:
|
||||
instance_group[field] = db_inst[field]
|
||||
|
||||
instance_group._context = context
|
||||
instance_group.obj_reset_changes()
|
||||
return instance_group
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_from_db_by_uuid(context, uuid):
|
||||
grp = _instance_group_get_query(context,
|
||||
id_field=api_models.InstanceGroup.uuid,
|
||||
id=uuid).first()
|
||||
if not grp:
|
||||
raise exception.InstanceGroupNotFound(group_uuid=uuid)
|
||||
return grp
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_from_db_by_id(context, id):
|
||||
grp = _instance_group_get_query(context,
|
||||
id_field=api_models.InstanceGroup.id,
|
||||
id=id).first()
|
||||
if not grp:
|
||||
raise exception.InstanceGroupNotFound(group_uuid=id)
|
||||
return grp
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_from_db_by_name(context, name):
|
||||
grp = _instance_group_get_query(context).filter_by(name=name).first()
|
||||
if not grp:
|
||||
raise exception.InstanceGroupNotFound(group_uuid=name)
|
||||
return grp
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_from_db_by_instance(context, instance_uuid):
|
||||
grp_member = context.session.query(api_models.InstanceGroupMember).\
|
||||
filter_by(instance_uuid=instance_uuid).first()
|
||||
if not grp_member:
|
||||
raise exception.InstanceGroupNotFound(group_uuid='')
|
||||
grp = InstanceGroup._get_from_db_by_id(context, grp_member.group_id)
|
||||
return grp
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.writer
|
||||
def _save_in_db(context, group_uuid, values):
|
||||
grp = _instance_group_get_query(context,
|
||||
id_field=api_models.InstanceGroup.uuid,
|
||||
id=group_uuid).first()
|
||||
if not grp:
|
||||
raise exception.InstanceGroupNotFound(group_uuid=group_uuid)
|
||||
|
||||
values_copy = copy.copy(values)
|
||||
policies = values_copy.pop('policies', None)
|
||||
members = values_copy.pop('members', None)
|
||||
|
||||
grp.update(values_copy)
|
||||
|
||||
if policies is not None:
|
||||
_instance_group_policies_add(context, grp, policies)
|
||||
if members is not None:
|
||||
_instance_group_members_add(context, grp, members)
|
||||
|
||||
return grp
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.writer
|
||||
def _create_in_db(context, values, policies=None, members=None):
|
||||
try:
|
||||
group = api_models.InstanceGroup()
|
||||
group.update(values)
|
||||
group.save(context.session)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.InstanceGroupIdExists(group_uuid=values['uuid'])
|
||||
|
||||
if policies:
|
||||
group._policies = _instance_group_policies_add(context, group,
|
||||
policies)
|
||||
else:
|
||||
group._policies = []
|
||||
|
||||
if members:
|
||||
group._members = _instance_group_members_add(context, group,
|
||||
members)
|
||||
else:
|
||||
group._members = []
|
||||
|
||||
return group
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.writer
|
||||
def _destroy_in_db(context, group_uuid):
|
||||
qry = _instance_group_get_query(context,
|
||||
id_field=api_models.InstanceGroup.uuid,
|
||||
id=group_uuid)
|
||||
if qry.count() == 0:
|
||||
raise exception.InstanceGroupNotFound(group_uuid=group_uuid)
|
||||
|
||||
# Delete policies and members
|
||||
group_id = qry.first().id
|
||||
instance_models = [api_models.InstanceGroupPolicy,
|
||||
api_models.InstanceGroupMember]
|
||||
for model in instance_models:
|
||||
context.session.query(model).filter_by(group_id=group_id).delete()
|
||||
|
||||
qry.delete()
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.writer
|
||||
def _add_members_in_db(context, group_uuid, members):
|
||||
return _instance_group_members_add_by_uuid(context, group_uuid,
|
||||
members)
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.writer
|
||||
def _remove_members_in_db(context, group_id, instance_uuids):
|
||||
# There is no public method provided for removing members because the
|
||||
# user-facing API doesn't allow removal of instance group members. We
|
||||
# need to be able to remove members to address quota races.
|
||||
context.session.query(api_models.InstanceGroupMember).\
|
||||
filter_by(group_id=group_id).\
|
||||
filter(api_models.InstanceGroupMember.instance_uuid.
|
||||
in_(set(instance_uuids))).\
|
||||
delete(synchronize_session=False)
|
||||
|
||||
def obj_load_attr(self, attrname):
|
||||
# NOTE(sbauza): Only hosts could be lazy-loaded right now
|
||||
if attrname != 'hosts':
|
||||
raise exception.ObjectActionError(
|
||||
action='obj_load_attr', reason='unable to load %s' % attrname)
|
||||
|
||||
self.hosts = self.get_hosts()
|
||||
self.obj_reset_changes(['hosts'])
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_uuid(cls, context, uuid):
|
||||
db_group = None
|
||||
try:
|
||||
db_group = cls._get_from_db_by_uuid(context, uuid)
|
||||
except exception.InstanceGroupNotFound:
|
||||
pass
|
||||
if db_group is None:
|
||||
db_group = db.instance_group_get(context, uuid)
|
||||
return cls._from_db_object(context, cls(), db_group)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_name(cls, context, name):
|
||||
try:
|
||||
db_group = cls._get_from_db_by_name(context, name)
|
||||
except exception.InstanceGroupNotFound:
|
||||
igs = InstanceGroupList._get_main_by_project_id(context,
|
||||
context.project_id)
|
||||
for ig in igs:
|
||||
if ig.name == name:
|
||||
return ig
|
||||
raise exception.InstanceGroupNotFound(group_uuid=name)
|
||||
return cls._from_db_object(context, cls(), db_group)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_instance_uuid(cls, context, instance_uuid):
|
||||
db_group = None
|
||||
try:
|
||||
db_group = cls._get_from_db_by_instance(context, instance_uuid)
|
||||
except exception.InstanceGroupNotFound:
|
||||
pass
|
||||
if db_group is None:
|
||||
db_group = db.instance_group_get_by_instance(context,
|
||||
instance_uuid)
|
||||
return cls._from_db_object(context, cls(), db_group)
|
||||
|
||||
@classmethod
|
||||
def get_by_hint(cls, context, hint):
|
||||
if uuidutils.is_uuid_like(hint):
|
||||
return cls.get_by_uuid(context, hint)
|
||||
else:
|
||||
return cls.get_by_name(context, hint)
|
||||
|
||||
@base.remotable
|
||||
def save(self):
|
||||
"""Save updates to this instance group."""
|
||||
|
||||
updates = self.obj_get_changes()
|
||||
|
||||
# NOTE(sbauza): We do NOT save the set of compute nodes that an
|
||||
# instance group is connected to in this method. Instance groups are
|
||||
# implicitly connected to compute nodes when the
|
||||
# InstanceGroup.add_members() method is called, which adds the mapping
|
||||
# table entries.
|
||||
# So, since the only way to have hosts in the updates is to set that
|
||||
# field explicitly, we prefer to raise an Exception so the developer
|
||||
# knows he has to call obj_reset_changes(['hosts']) right after setting
|
||||
# the field.
|
||||
if 'hosts' in updates:
|
||||
raise exception.InstanceGroupSaveException(field='hosts')
|
||||
|
||||
if not updates:
|
||||
return
|
||||
|
||||
payload = dict(updates)
|
||||
payload['server_group_id'] = self.uuid
|
||||
|
||||
try:
|
||||
db_group = self._save_in_db(self._context, self.uuid, updates)
|
||||
except exception.InstanceGroupNotFound:
|
||||
db.instance_group_update(self._context, self.uuid, updates)
|
||||
db_group = db.instance_group_get(self._context, self.uuid)
|
||||
self._from_db_object(self._context, self, db_group)
|
||||
compute_utils.notify_about_server_group_update(self._context,
|
||||
"update", payload)
|
||||
|
||||
@base.remotable
|
||||
def refresh(self):
|
||||
"""Refreshes the instance group."""
|
||||
current = self.__class__.get_by_uuid(self._context, self.uuid)
|
||||
for field in self.fields:
|
||||
if self.obj_attr_is_set(field) and self[field] != current[field]:
|
||||
self[field] = current[field]
|
||||
self.obj_reset_changes()
|
||||
|
||||
def _create(self, skipcheck=False):
|
||||
# NOTE(danms): This is just for the migration routine, and
|
||||
# can be removed once we're no longer supporting the migration
|
||||
# of instance groups from the main to api database.
|
||||
if self.obj_attr_is_set('id'):
|
||||
raise exception.ObjectActionError(action='create',
|
||||
reason='already created')
|
||||
updates = self.obj_get_changes()
|
||||
payload = dict(updates)
|
||||
updates.pop('id', None)
|
||||
policies = updates.pop('policies', None)
|
||||
members = updates.pop('members', None)
|
||||
|
||||
if 'uuid' not in updates:
|
||||
self.uuid = uuidutils.generate_uuid()
|
||||
updates['uuid'] = self.uuid
|
||||
|
||||
if not skipcheck:
|
||||
try:
|
||||
db.instance_group_get(self._context, self.uuid)
|
||||
raise exception.ObjectActionError(
|
||||
action='create',
|
||||
reason='already created in main')
|
||||
except exception.InstanceGroupNotFound:
|
||||
pass
|
||||
db_group = self._create_in_db(self._context, updates,
|
||||
policies=policies,
|
||||
members=members)
|
||||
self._from_db_object(self._context, self, db_group)
|
||||
payload['server_group_id'] = self.uuid
|
||||
compute_utils.notify_about_server_group_update(self._context,
|
||||
"create", payload)
|
||||
compute_utils.notify_about_server_group_action(
|
||||
context=self._context,
|
||||
group=self,
|
||||
action=fields.NotificationAction.CREATE)
|
||||
|
||||
@base.remotable
|
||||
def create(self):
|
||||
self._create()
|
||||
|
||||
@base.remotable
|
||||
def destroy(self):
|
||||
payload = {'server_group_id': self.uuid}
|
||||
try:
|
||||
self._destroy_in_db(self._context, self.uuid)
|
||||
except exception.InstanceGroupNotFound:
|
||||
db.instance_group_delete(self._context, self.uuid)
|
||||
self.obj_reset_changes()
|
||||
compute_utils.notify_about_server_group_update(self._context,
|
||||
"delete", payload)
|
||||
compute_utils.notify_about_server_group_action(
|
||||
context=self._context,
|
||||
group=self,
|
||||
action=fields.NotificationAction.DELETE)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def add_members(cls, context, group_uuid, instance_uuids):
|
||||
payload = {'server_group_id': group_uuid,
|
||||
'instance_uuids': instance_uuids}
|
||||
try:
|
||||
members = cls._add_members_in_db(context, group_uuid,
|
||||
instance_uuids)
|
||||
members = [member['instance_uuid'] for member in members]
|
||||
except exception.InstanceGroupNotFound:
|
||||
members = db.instance_group_members_add(context, group_uuid,
|
||||
instance_uuids)
|
||||
compute_utils.notify_about_server_group_update(context,
|
||||
"addmember", payload)
|
||||
return list(members)
|
||||
|
||||
@base.remotable
|
||||
def get_hosts(self, exclude=None):
|
||||
"""Get a list of hosts for non-deleted instances in the group
|
||||
|
||||
This method allows you to get a list of the hosts where instances in
|
||||
this group are currently running. There's also an option to exclude
|
||||
certain instance UUIDs from this calculation.
|
||||
|
||||
"""
|
||||
filter_uuids = self.members
|
||||
if exclude:
|
||||
filter_uuids = set(filter_uuids) - set(exclude)
|
||||
filters = {'uuid': filter_uuids, 'deleted': False}
|
||||
instances = objects.InstanceList.get_by_filters(self._context,
|
||||
filters=filters)
|
||||
return list(set([instance.host for instance in instances
|
||||
if instance.host]))
|
||||
|
||||
@base.remotable
|
||||
def count_members_by_user(self, user_id):
|
||||
"""Count the number of instances in a group belonging to a user."""
|
||||
filter_uuids = self.members
|
||||
filters = {'uuid': filter_uuids, 'user_id': user_id, 'deleted': False}
|
||||
instances = objects.InstanceList.get_by_filters(self._context,
|
||||
filters=filters)
|
||||
return len(instances)
|
||||
|
||||
|
||||
@base.NovaObjectRegistry.register
|
||||
class InstanceGroupList(base.ObjectListBase, base.NovaObject):
|
||||
# Version 1.0: Initial version
|
||||
# InstanceGroup <= version 1.3
|
||||
# Version 1.1: InstanceGroup <= version 1.4
|
||||
# Version 1.2: InstanceGroup <= version 1.5
|
||||
# Version 1.3: InstanceGroup <= version 1.6
|
||||
# Version 1.4: InstanceGroup <= version 1.7
|
||||
# Version 1.5: InstanceGroup <= version 1.8
|
||||
# Version 1.6: InstanceGroup <= version 1.9
|
||||
# Version 1.7: InstanceGroup <= version 1.10
|
||||
# Version 1.8: Added get_counts() for quotas
|
||||
VERSION = '1.8'
|
||||
|
||||
fields = {
|
||||
'objects': fields.ListOfObjectsField('InstanceGroup'),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_from_db(context, project_id=None):
|
||||
query = _instance_group_get_query(context)
|
||||
if project_id is not None:
|
||||
query = query.filter_by(project_id=project_id)
|
||||
return query.all()
|
||||
|
||||
@classmethod
|
||||
def _get_main_by_project_id(cls, context, project_id):
|
||||
main_db_groups = db.instance_group_get_all_by_project_id(context,
|
||||
project_id)
|
||||
return base.obj_make_list(context, cls(context), objects.InstanceGroup,
|
||||
main_db_groups)
|
||||
|
||||
@staticmethod
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_counts_from_db(context, project_id, user_id=None):
|
||||
query = context.session.query(api_models.InstanceGroup.id).\
|
||||
filter_by(project_id=project_id)
|
||||
counts = {}
|
||||
counts['project'] = {'server_groups': query.count()}
|
||||
if user_id:
|
||||
query = query.filter_by(user_id=user_id)
|
||||
counts['user'] = {'server_groups': query.count()}
|
||||
return counts
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_project_id(cls, context, project_id):
|
||||
api_db_groups = cls._get_from_db(context, project_id=project_id)
|
||||
main_db_groups = db.instance_group_get_all_by_project_id(context,
|
||||
project_id)
|
||||
return base.obj_make_list(context, cls(context), objects.InstanceGroup,
|
||||
api_db_groups + main_db_groups)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_all(cls, context):
|
||||
api_db_groups = cls._get_from_db(context)
|
||||
main_db_groups = db.instance_group_get_all(context)
|
||||
return base.obj_make_list(context, cls(context), objects.InstanceGroup,
|
||||
api_db_groups + main_db_groups)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_counts(cls, context, project_id, user_id=None):
|
||||
"""Get the counts of InstanceGroup objects in the database.
|
||||
|
||||
:param context: The request context for database access
|
||||
:param project_id: The project_id to count across
|
||||
:param user_id: The user_id to count across
|
||||
:returns: A dict containing the project-scoped counts and user-scoped
|
||||
counts if user_id is specified. For example:
|
||||
|
||||
{'project': {'server_groups': <count across project>},
|
||||
'user': {'server_groups': <count across user>}}
|
||||
"""
|
||||
return cls._get_counts_from_db(context, project_id, user_id=user_id)
|
||||
|
||||
|
||||
@db_api.pick_context_manager_reader
|
||||
def _get_main_instance_groups(context, limit):
|
||||
return context.session.query(main_models.InstanceGroup).\
|
||||
options(joinedload('_policies')).\
|
||||
options(joinedload('_members')).\
|
||||
filter_by(deleted=0).\
|
||||
limit(limit).\
|
||||
all()
|
||||
|
||||
|
||||
def migrate_instance_groups_to_api_db(context, count):
|
||||
main_groups = _get_main_instance_groups(context, count)
|
||||
done = 0
|
||||
for db_group in main_groups:
|
||||
group = objects.InstanceGroup(context=context,
|
||||
user_id=db_group.user_id,
|
||||
project_id=db_group.project_id,
|
||||
uuid=db_group.uuid,
|
||||
name=db_group.name,
|
||||
policies=db_group.policies,
|
||||
members=db_group.members)
|
||||
try:
|
||||
group._create(skipcheck=True)
|
||||
except exception.InstanceGroupIdExists:
|
||||
# NOTE(melwitt): This might happen if there's a failure right after
|
||||
# the InstanceGroup was created and the migration is re-run.
|
||||
pass
|
||||
db_api.instance_group_delete(context, db_group.uuid)
|
||||
done += 1
|
||||
return len(main_groups), done
|
|
@ -0,0 +1,284 @@
|
|||
# Copyright 2013 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db.sqlalchemy import utils as sqlalchemyutils
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import versionutils
|
||||
|
||||
from nova import db
|
||||
from nova.db.sqlalchemy import api as db_api
|
||||
from nova.db.sqlalchemy import api_models
|
||||
from nova.db.sqlalchemy import models as main_models
|
||||
from nova import exception
|
||||
from nova import objects
|
||||
from nova.objects import base
|
||||
from nova.objects import fields
|
||||
|
||||
KEYPAIR_TYPE_SSH = 'ssh'
|
||||
KEYPAIR_TYPE_X509 = 'x509'
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_from_db(context, user_id, name=None, limit=None, marker=None):
|
||||
query = context.session.query(api_models.KeyPair).\
|
||||
filter(api_models.KeyPair.user_id == user_id)
|
||||
if name is not None:
|
||||
db_keypair = query.filter(api_models.KeyPair.name == name).\
|
||||
first()
|
||||
if not db_keypair:
|
||||
raise exception.KeypairNotFound(user_id=user_id, name=name)
|
||||
return db_keypair
|
||||
|
||||
marker_row = None
|
||||
if marker is not None:
|
||||
marker_row = context.session.query(api_models.KeyPair).\
|
||||
filter(api_models.KeyPair.name == marker).\
|
||||
filter(api_models.KeyPair.user_id == user_id).first()
|
||||
if not marker_row:
|
||||
raise exception.MarkerNotFound(marker=marker)
|
||||
|
||||
query = sqlalchemyutils.paginate_query(
|
||||
query, api_models.KeyPair, limit, ['name'], marker=marker_row)
|
||||
|
||||
return query.all()
|
||||
|
||||
|
||||
@db_api.api_context_manager.reader
|
||||
def _get_count_from_db(context, user_id):
|
||||
return context.session.query(api_models.KeyPair).\
|
||||
filter(api_models.KeyPair.user_id == user_id).\
|
||||
count()
|
||||
|
||||
|
||||
@db_api.api_context_manager.writer
|
||||
def _create_in_db(context, values):
|
||||
kp = api_models.KeyPair()
|
||||
kp.update(values)
|
||||
try:
|
||||
kp.save(context.session)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
raise exception.KeyPairExists(key_name=values['name'])
|
||||
return kp
|
||||
|
||||
|
||||
@db_api.api_context_manager.writer
|
||||
def _destroy_in_db(context, user_id, name):
|
||||
result = context.session.query(api_models.KeyPair).\
|
||||
filter_by(user_id=user_id).\
|
||||
filter_by(name=name).\
|
||||
delete()
|
||||
if not result:
|
||||
raise exception.KeypairNotFound(user_id=user_id, name=name)
|
||||
|
||||
|
||||
# TODO(berrange): Remove NovaObjectDictCompat
|
||||
@base.NovaObjectRegistry.register
|
||||
class KeyPair(base.NovaPersistentObject, base.NovaObject,
|
||||
base.NovaObjectDictCompat):
|
||||
# Version 1.0: Initial version
|
||||
# Version 1.1: String attributes updated to support unicode
|
||||
# Version 1.2: Added keypair type
|
||||
# Version 1.3: Name field is non-null
|
||||
# Version 1.4: Add localonly flag to get_by_name()
|
||||
VERSION = '1.4'
|
||||
|
||||
fields = {
|
||||
'id': fields.IntegerField(),
|
||||
'name': fields.StringField(nullable=False),
|
||||
'user_id': fields.StringField(nullable=True),
|
||||
'fingerprint': fields.StringField(nullable=True),
|
||||
'public_key': fields.StringField(nullable=True),
|
||||
'type': fields.StringField(nullable=False),
|
||||
}
|
||||
|
||||
def obj_make_compatible(self, primitive, target_version):
|
||||
super(KeyPair, self).obj_make_compatible(primitive, target_version)
|
||||
target_version = versionutils.convert_version_to_tuple(target_version)
|
||||
if target_version < (1, 2) and 'type' in primitive:
|
||||
del primitive['type']
|
||||
|
||||
@staticmethod
|
||||
def _from_db_object(context, keypair, db_keypair):
|
||||
ignore = {'deleted': False,
|
||||
'deleted_at': None}
|
||||
for key in keypair.fields:
|
||||
if key in ignore and not hasattr(db_keypair, key):
|
||||
keypair[key] = ignore[key]
|
||||
else:
|
||||
keypair[key] = db_keypair[key]
|
||||
keypair._context = context
|
||||
keypair.obj_reset_changes()
|
||||
return keypair
|
||||
|
||||
@staticmethod
|
||||
def _get_from_db(context, user_id, name):
|
||||
return _get_from_db(context, user_id, name=name)
|
||||
|
||||
@staticmethod
|
||||
def _destroy_in_db(context, user_id, name):
|
||||
return _destroy_in_db(context, user_id, name)
|
||||
|
||||
@staticmethod
|
||||
def _create_in_db(context, values):
|
||||
return _create_in_db(context, values)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_name(cls, context, user_id, name,
|
||||
localonly=False):
|
||||
db_keypair = None
|
||||
if not localonly:
|
||||
try:
|
||||
db_keypair = cls._get_from_db(context, user_id, name)
|
||||
except exception.KeypairNotFound:
|
||||
pass
|
||||
if db_keypair is None:
|
||||
db_keypair = db.key_pair_get(context, user_id, name)
|
||||
return cls._from_db_object(context, cls(), db_keypair)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def destroy_by_name(cls, context, user_id, name):
|
||||
try:
|
||||
cls._destroy_in_db(context, user_id, name)
|
||||
except exception.KeypairNotFound:
|
||||
db.key_pair_destroy(context, user_id, name)
|
||||
|
||||
@base.remotable
|
||||
def create(self):
|
||||
if self.obj_attr_is_set('id'):
|
||||
raise exception.ObjectActionError(action='create',
|
||||
reason='already created')
|
||||
|
||||
# NOTE(danms): Check to see if it exists in the old DB before
|
||||
# letting them create in the API DB, since we won't get protection
|
||||
# from the UC.
|
||||
try:
|
||||
db.key_pair_get(self._context, self.user_id, self.name)
|
||||
raise exception.KeyPairExists(key_name=self.name)
|
||||
except exception.KeypairNotFound:
|
||||
pass
|
||||
|
||||
self._create()
|
||||
|
||||
def _create(self):
|
||||
updates = self.obj_get_changes()
|
||||
db_keypair = self._create_in_db(self._context, updates)
|
||||
self._from_db_object(self._context, self, db_keypair)
|
||||
|
||||
@base.remotable
|
||||
def destroy(self):
|
||||
try:
|
||||
self._destroy_in_db(self._context, self.user_id, self.name)
|
||||
except exception.KeypairNotFound:
|
||||
db.key_pair_destroy(self._context, self.user_id, self.name)
|
||||
|
||||
|
||||
@base.NovaObjectRegistry.register
|
||||
class KeyPairList(base.ObjectListBase, base.NovaObject):
|
||||
# Version 1.0: Initial version
|
||||
# KeyPair <= version 1.1
|
||||
# Version 1.1: KeyPair <= version 1.2
|
||||
# Version 1.2: KeyPair <= version 1.3
|
||||
# Version 1.3: Add new parameters 'limit' and 'marker' to get_by_user()
|
||||
VERSION = '1.3'
|
||||
|
||||
fields = {
|
||||
'objects': fields.ListOfObjectsField('KeyPair'),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _get_from_db(context, user_id, limit, marker):
|
||||
return _get_from_db(context, user_id, limit=limit, marker=marker)
|
||||
|
||||
@staticmethod
|
||||
def _get_count_from_db(context, user_id):
|
||||
return _get_count_from_db(context, user_id)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_by_user(cls, context, user_id, limit=None, marker=None):
|
||||
try:
|
||||
api_db_keypairs = cls._get_from_db(
|
||||
context, user_id, limit=limit, marker=marker)
|
||||
# NOTE(pkholkin): If we were asked for a marker and found it in
|
||||
# results from the API DB, we must continue our pagination with
|
||||
# just the limit (if any) to the main DB.
|
||||
marker = None
|
||||
except exception.MarkerNotFound:
|
||||
api_db_keypairs = []
|
||||
|
||||
if limit is not None:
|
||||
limit_more = limit - len(api_db_keypairs)
|
||||
else:
|
||||
limit_more = None
|
||||
|
||||
if limit_more is None or limit_more > 0:
|
||||
main_db_keypairs = db.key_pair_get_all_by_user(
|
||||
context, user_id, limit=limit_more, marker=marker)
|
||||
else:
|
||||
main_db_keypairs = []
|
||||
|
||||
return base.obj_make_list(context, cls(context), objects.KeyPair,
|
||||
api_db_keypairs + main_db_keypairs)
|
||||
|
||||
@base.remotable_classmethod
|
||||
def get_count_by_user(cls, context, user_id):
|
||||
return (cls._get_count_from_db(context, user_id) +
|
||||
db.key_pair_count_by_user(context, user_id))
|
||||
|
||||
|
||||
@db_api.pick_context_manager_reader
|
||||
def _count_unmigrated_instances(context):
|
||||
return context.session.query(main_models.InstanceExtra).\
|
||||
filter_by(keypairs=None).\
|
||||
filter_by(deleted=0).\
|
||||
count()
|
||||
|
||||
|
||||
@db_api.pick_context_manager_reader
|
||||
def _get_main_keypairs(context, limit):
|
||||
return context.session.query(main_models.KeyPair).\
|
||||
filter_by(deleted=0).\
|
||||
limit(limit).\
|
||||
all()
|
||||
|
||||
|
||||
def migrate_keypairs_to_api_db(context, count):
|
||||
bad_instances = _count_unmigrated_instances(context)
|
||||
if bad_instances:
|
||||
LOG.error('Some instances are still missing keypair '
|
||||
'information. Unable to run keypair migration '
|
||||
'at this time.')
|
||||
return 0, 0
|
||||
|
||||
main_keypairs = _get_main_keypairs(context, count)
|
||||
done = 0
|
||||
for db_keypair in main_keypairs:
|
||||
kp = objects.KeyPair(context=context,
|
||||
user_id=db_keypair.user_id,
|
||||
name=db_keypair.name,
|
||||
fingerprint=db_keypair.fingerprint,
|
||||
public_key=db_keypair.public_key,
|
||||
type=db_keypair.type)
|
||||
try:
|
||||
kp._create()
|
||||
except exception.KeyPairExists:
|
||||
# NOTE(danms): If this got created somehow in the API DB,
|
||||
# then it's newer and we just continue on to destroy the
|
||||
# old one in the cell DB.
|
||||
pass
|
||||
db_api.key_pair_destroy(context, db_keypair.user_id, db_keypair.name)
|
||||
done += 1
|
||||
|
||||
return len(main_keypairs), done
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,337 @@
|
|||
# Copyright (c) 2011 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
The FilterScheduler is for creating instances locally.
|
||||
You can customize this scheduler by specifying your own Host Filters and
|
||||
Weighing Functions.
|
||||
"""
|
||||
|
||||
import random
|
||||
|
||||
from oslo_log import log as logging
|
||||
from six.moves import range
|
||||
|
||||
import nova.conf
|
||||
from nova import exception
|
||||
from nova.i18n import _
|
||||
from nova import rpc
|
||||
from nova.scheduler import client
|
||||
from nova.scheduler import driver
|
||||
|
||||
CONF = nova.conf.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FilterScheduler(driver.Scheduler):
|
||||
"""Scheduler that can be used for filtering and weighing."""
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(FilterScheduler, self).__init__(*args, **kwargs)
|
||||
self.notifier = rpc.get_notifier('scheduler')
|
||||
scheduler_client = client.SchedulerClient()
|
||||
self.placement_client = scheduler_client.reportclient
|
||||
|
||||
def select_destinations(self, context, spec_obj, instance_uuids,
|
||||
alloc_reqs_by_rp_uuid, provider_summaries):
|
||||
"""Returns a sorted list of HostState objects that satisfy the
|
||||
supplied request_spec.
|
||||
|
||||
These hosts will have already had their resources claimed in Placement.
|
||||
|
||||
:param context: The RequestContext object
|
||||
:param spec_obj: The RequestSpec object
|
||||
:param instance_uuids: List of UUIDs, one for each value of the spec
|
||||
object's num_instances attribute
|
||||
:param alloc_reqs_by_rp_uuid: Optional dict, keyed by resource provider
|
||||
UUID, of the allocation requests that may
|
||||
be used to claim resources against
|
||||
matched hosts. If None, indicates either
|
||||
the placement API wasn't reachable or
|
||||
that there were no allocation requests
|
||||
returned by the placement API. If the
|
||||
latter, the provider_summaries will be an
|
||||
empty dict, not None.
|
||||
:param provider_summaries: Optional dict, keyed by resource provider
|
||||
UUID, of information that will be used by
|
||||
the filters/weighers in selecting matching
|
||||
hosts for a request. If None, indicates that
|
||||
the scheduler driver should grab all compute
|
||||
node information locally and that the
|
||||
Placement API is not used. If an empty dict,
|
||||
indicates the Placement API returned no
|
||||
potential matches for the requested
|
||||
resources.
|
||||
"""
|
||||
self.notifier.info(
|
||||
context, 'scheduler.select_destinations.start',
|
||||
dict(request_spec=spec_obj.to_legacy_request_spec_dict()))
|
||||
|
||||
num_instances = spec_obj.num_instances
|
||||
selected_hosts = self._schedule(context, spec_obj, instance_uuids,
|
||||
alloc_reqs_by_rp_uuid, provider_summaries)
|
||||
|
||||
# Couldn't fulfill the request_spec
|
||||
if len(selected_hosts) < num_instances:
|
||||
# NOTE(Rui Chen): If multiple creates failed, set the updated time
|
||||
# of selected HostState to None so that these HostStates are
|
||||
# refreshed according to database in next schedule, and release
|
||||
# the resource consumed by instance in the process of selecting
|
||||
# host.
|
||||
for host in selected_hosts:
|
||||
host.updated = None
|
||||
|
||||
# Log the details but don't put those into the reason since
|
||||
# we don't want to give away too much information about our
|
||||
# actual environment.
|
||||
LOG.debug('There are %(hosts)d hosts available but '
|
||||
'%(num_instances)d instances requested to build.',
|
||||
{'hosts': len(selected_hosts),
|
||||
'num_instances': num_instances})
|
||||
|
||||
reason = _('There are not enough hosts available.')
|
||||
raise exception.NoValidHost(reason=reason)
|
||||
|
||||
self.notifier.info(
|
||||
context, 'scheduler.select_destinations.end',
|
||||
dict(request_spec=spec_obj.to_legacy_request_spec_dict()))
|
||||
return selected_hosts
|
||||
|
||||
def _schedule(self, context, spec_obj, instance_uuids,
|
||||
alloc_reqs_by_rp_uuid, provider_summaries):
|
||||
"""Returns a list of hosts that meet the required specs, ordered by
|
||||
their fitness.
|
||||
|
||||
These hosts will have already had their resources claimed in Placement.
|
||||
|
||||
:param context: The RequestContext object
|
||||
:param spec_obj: The RequestSpec object
|
||||
:param instance_uuids: List of instance UUIDs to place or move.
|
||||
:param alloc_reqs_by_rp_uuid: Optional dict, keyed by resource provider
|
||||
UUID, of the allocation requests that may
|
||||
be used to claim resources against
|
||||
matched hosts. If None, indicates either
|
||||
the placement API wasn't reachable or
|
||||
that there were no allocation requests
|
||||
returned by the placement API. If the
|
||||
latter, the provider_summaries will be an
|
||||
empty dict, not None.
|
||||
:param provider_summaries: Optional dict, keyed by resource provider
|
||||
UUID, of information that will be used by
|
||||
the filters/weighers in selecting matching
|
||||
hosts for a request. If None, indicates that
|
||||
the scheduler driver should grab all compute
|
||||
node information locally and that the
|
||||
Placement API is not used. If an empty dict,
|
||||
indicates the Placement API returned no
|
||||
potential matches for the requested
|
||||
resources.
|
||||
"""
|
||||
elevated = context.elevated()
|
||||
|
||||
# Find our local list of acceptable hosts by repeatedly
|
||||
# filtering and weighing our options. Each time we choose a
|
||||
# host, we virtually consume resources on it so subsequent
|
||||
# selections can adjust accordingly.
|
||||
|
||||
# Note: remember, we are using an iterator here. So only
|
||||
# traverse this list once. This can bite you if the hosts
|
||||
# are being scanned in a filter or weighing function.
|
||||
hosts = self._get_all_host_states(elevated, spec_obj,
|
||||
provider_summaries)
|
||||
|
||||
# A list of the instance UUIDs that were successfully claimed against
|
||||
# in the placement API. If we are not able to successfully claim for
|
||||
# all involved instances, we use this list to remove those allocations
|
||||
# before returning
|
||||
claimed_instance_uuids = []
|
||||
|
||||
selected_hosts = []
|
||||
|
||||
# NOTE(sbauza): The RequestSpec.num_instances field contains the number
|
||||
# of instances created when the RequestSpec was used to first boot some
|
||||
# instances. This is incorrect when doing a move or resize operation,
|
||||
# so prefer the length of instance_uuids unless it is None.
|
||||
num_instances = (len(instance_uuids) if instance_uuids
|
||||
else spec_obj.num_instances)
|
||||
for num in range(num_instances):
|
||||
hosts = self._get_sorted_hosts(spec_obj, hosts, num)
|
||||
if not hosts:
|
||||
# NOTE(jaypipes): If we get here, that means not all instances
|
||||
# in instance_uuids were able to be matched to a selected host.
|
||||
# So, let's clean up any already-claimed allocations here
|
||||
# before breaking and returning
|
||||
self._cleanup_allocations(claimed_instance_uuids)
|
||||
break
|
||||
|
||||
if (instance_uuids is None or
|
||||
not self.USES_ALLOCATION_CANDIDATES or
|
||||
alloc_reqs_by_rp_uuid is None):
|
||||
# Unfortunately, we still need to deal with older conductors
|
||||
# that may not be passing in a list of instance_uuids. In those
|
||||
# cases, obviously we can't claim resources because we don't
|
||||
# have instance UUIDs to claim with, so we just grab the first
|
||||
# host in the list of sorted hosts. In addition to older
|
||||
# conductors, we need to support the caching scheduler, which
|
||||
# doesn't use the placement API (and has
|
||||
# USES_ALLOCATION_CANDIDATE = False) and therefore we skip all
|
||||
# the claiming logic for that scheduler driver. Finally, if
|
||||
# there was a problem communicating with the placement API,
|
||||
# alloc_reqs_by_rp_uuid will be None, so we skip claiming in
|
||||
# that case as well
|
||||
claimed_host = hosts[0]
|
||||
else:
|
||||
instance_uuid = instance_uuids[num]
|
||||
|
||||
# Attempt to claim the resources against one or more resource
|
||||
# providers, looping over the sorted list of possible hosts
|
||||
# looking for an allocation request that contains that host's
|
||||
# resource provider UUID
|
||||
claimed_host = None
|
||||
for host in hosts:
|
||||
cn_uuid = host.uuid
|
||||
if cn_uuid not in alloc_reqs_by_rp_uuid:
|
||||
LOG.debug("Found host state %s that wasn't in "
|
||||
"allocation requests. Skipping.", cn_uuid)
|
||||
continue
|
||||
|
||||
alloc_reqs = alloc_reqs_by_rp_uuid[cn_uuid]
|
||||
if self._claim_resources(elevated, spec_obj, instance_uuid,
|
||||
alloc_reqs):
|
||||
claimed_host = host
|
||||
break
|
||||
|
||||
if claimed_host is None:
|
||||
# We weren't able to claim resources in the placement API
|
||||
# for any of the sorted hosts identified. So, clean up any
|
||||
# successfully-claimed resources for prior instances in
|
||||
# this request and return an empty list which will cause
|
||||
# select_destinations() to raise NoValidHost
|
||||
LOG.debug("Unable to successfully claim against any host.")
|
||||
self._cleanup_allocations(claimed_instance_uuids)
|
||||
return []
|
||||
|
||||
claimed_instance_uuids.append(instance_uuid)
|
||||
|
||||
LOG.debug("Selected host: %(host)s", {'host': claimed_host})
|
||||
selected_hosts.append(claimed_host)
|
||||
|
||||
# Now consume the resources so the filter/weights will change for
|
||||
# the next instance.
|
||||
claimed_host.consume_from_request(spec_obj)
|
||||
if spec_obj.instance_group is not None:
|
||||
spec_obj.instance_group.hosts.append(claimed_host.host)
|
||||
# hosts has to be not part of the updates when saving
|
||||
spec_obj.instance_group.obj_reset_changes(['hosts'])
|
||||
return selected_hosts
|
||||
|
||||
def _cleanup_allocations(self, instance_uuids):
|
||||
"""Removes allocations for the supplied instance UUIDs."""
|
||||
if not instance_uuids:
|
||||
return
|
||||
LOG.debug("Cleaning up allocations for %s", instance_uuids)
|
||||
for uuid in instance_uuids:
|
||||
self.placement_client.delete_allocation_for_instance(uuid)
|
||||
|
||||
def _claim_resources(self, ctx, spec_obj, instance_uuid, alloc_reqs):
|
||||
"""Given an instance UUID (representing the consumer of resources), the
|
||||
HostState object for the host that was chosen for the instance, and a
|
||||
list of allocation request JSON objects, attempt to claim resources for
|
||||
the instance in the placement API. Returns True if the claim process
|
||||
was successful, False otherwise.
|
||||
|
||||
:param ctx: The RequestContext object
|
||||
:param spec_obj: The RequestSpec object
|
||||
:param instance_uuid: The UUID of the consuming instance
|
||||
:param cn_uuid: UUID of the host to allocate against
|
||||
:param alloc_reqs: A list of allocation request JSON objects that
|
||||
allocate against (at least) the compute host
|
||||
selected by the _schedule() method. These allocation
|
||||
requests were constructed from a call to the GET
|
||||
/allocation_candidates placement API call. Each
|
||||
allocation_request satisfies the original request
|
||||
for resources and can be supplied as-is (along with
|
||||
the project and user ID to the placement API's
|
||||
PUT /allocations/{consumer_uuid} call to claim
|
||||
resources for the instance
|
||||
"""
|
||||
LOG.debug("Attempting to claim resources in the placement API for "
|
||||
"instance %s", instance_uuid)
|
||||
|
||||
project_id = spec_obj.project_id
|
||||
|
||||
# NOTE(jaypipes): So, the RequestSpec doesn't store the user_id,
|
||||
# only the project_id, so we need to grab the user information from
|
||||
# the context. Perhaps we should consider putting the user ID in
|
||||
# the spec object?
|
||||
user_id = ctx.user_id
|
||||
|
||||
# TODO(jaypipes): Loop through all allocation requests instead of just
|
||||
# trying the first one. For now, since we'll likely want to order the
|
||||
# allocation requests in the future based on information in the
|
||||
# provider summaries, we'll just try to claim resources using the first
|
||||
# allocation request
|
||||
alloc_req = alloc_reqs[0]
|
||||
|
||||
return self.placement_client.claim_resources(instance_uuid,
|
||||
alloc_req, project_id, user_id)
|
||||
|
||||
def _get_sorted_hosts(self, spec_obj, host_states, index):
|
||||
"""Returns a list of HostState objects that match the required
|
||||
scheduling constraints for the request spec object and have been sorted
|
||||
according to the weighers.
|
||||
"""
|
||||
filtered_hosts = self.host_manager.get_filtered_hosts(host_states,
|
||||
spec_obj, index)
|
||||
|
||||
LOG.debug("Filtered %(hosts)s", {'hosts': filtered_hosts})
|
||||
|
||||
if not filtered_hosts:
|
||||
return []
|
||||
|
||||
weighed_hosts = self.host_manager.get_weighed_hosts(filtered_hosts,
|
||||
spec_obj)
|
||||
# Strip off the WeighedHost wrapper class...
|
||||
weighed_hosts = [h.obj for h in weighed_hosts]
|
||||
|
||||
LOG.debug("Weighed %(hosts)s", {'hosts': weighed_hosts})
|
||||
|
||||
# We randomize the first element in the returned list to alleviate
|
||||
# congestion where the same host is consistently selected among
|
||||
# numerous potential hosts for similar request specs.
|
||||
host_subset_size = CONF.filter_scheduler.host_subset_size
|
||||
if host_subset_size < len(weighed_hosts):
|
||||
weighed_subset = weighed_hosts[0:host_subset_size]
|
||||
else:
|
||||
weighed_subset = weighed_hosts
|
||||
chosen_host = random.choice(weighed_subset)
|
||||
weighed_hosts.remove(chosen_host)
|
||||
return [chosen_host] + weighed_hosts
|
||||
|
||||
def _get_all_host_states(self, context, spec_obj, provider_summaries):
|
||||
"""Template method, so a subclass can implement caching."""
|
||||
# NOTE(jaypipes): provider_summaries being None is treated differently
|
||||
# from an empty dict. provider_summaries is None when we want to grab
|
||||
# all compute nodes, for instance when using the caching scheduler.
|
||||
# The provider_summaries variable will be an empty dict when the
|
||||
# Placement API found no providers that match the requested
|
||||
# constraints, which in turn makes compute_uuids an empty list and
|
||||
# get_host_states_by_uuids will return an empty tuple also, which will
|
||||
# eventually result in a NoValidHost error.
|
||||
compute_uuids = None
|
||||
if provider_summaries is not None:
|
||||
compute_uuids = list(provider_summaries.keys())
|
||||
return self.host_manager.get_host_states_by_uuids(context,
|
||||
compute_uuids,
|
||||
spec_obj)
|
|
@ -0,0 +1,178 @@
|
|||
# Copyright 2012 Michael Still and Canonical Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Config Drive v2 helper."""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from oslo_utils import fileutils
|
||||
from oslo_utils import units
|
||||
import six
|
||||
|
||||
import nova.conf
|
||||
from nova import exception
|
||||
from nova.objects import fields
|
||||
from nova import utils
|
||||
from nova import version
|
||||
|
||||
CONF = nova.conf.CONF
|
||||
|
||||
# Config drives are 64mb, if we can't size to the exact size of the data
|
||||
CONFIGDRIVESIZE_BYTES = 64 * units.Mi
|
||||
|
||||
|
||||
class ConfigDriveBuilder(object):
|
||||
"""Build config drives, optionally as a context manager."""
|
||||
|
||||
def __init__(self, instance_md=None):
|
||||
self.imagefile = None
|
||||
self.mdfiles = []
|
||||
|
||||
if instance_md is not None:
|
||||
self.add_instance_metadata(instance_md)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exctype, excval, exctb):
|
||||
if exctype is not None:
|
||||
# NOTE(mikal): this means we're being cleaned up because an
|
||||
# exception was thrown. All bets are off now, and we should not
|
||||
# swallow the exception
|
||||
return False
|
||||
self.cleanup()
|
||||
|
||||
def _add_file(self, basedir, path, data):
|
||||
filepath = os.path.join(basedir, path)
|
||||
dirname = os.path.dirname(filepath)
|
||||
fileutils.ensure_tree(dirname)
|
||||
with open(filepath, 'wb') as f:
|
||||
# the given data can be either text or bytes. we can only write
|
||||
# bytes into files.
|
||||
if isinstance(data, six.text_type):
|
||||
data = data.encode('utf-8')
|
||||
f.write(data)
|
||||
|
||||
def add_instance_metadata(self, instance_md):
|
||||
for (path, data) in instance_md.metadata_for_config_drive():
|
||||
self.mdfiles.append((path, data))
|
||||
|
||||
def _write_md_files(self, basedir):
|
||||
for data in self.mdfiles:
|
||||
self._add_file(basedir, data[0], data[1])
|
||||
|
||||
def _make_iso9660(self, path, tmpdir):
|
||||
publisher = "%(product)s %(version)s" % {
|
||||
'product': version.product_string(),
|
||||
'version': version.version_string_with_package()
|
||||
}
|
||||
|
||||
utils.execute(CONF.mkisofs_cmd,
|
||||
'-o', path,
|
||||
'-ldots',
|
||||
'-allow-lowercase',
|
||||
'-allow-multidot',
|
||||
'-l',
|
||||
'-publisher',
|
||||
publisher,
|
||||
'-quiet',
|
||||
'-J',
|
||||
'-r',
|
||||
'-V', 'config-2',
|
||||
tmpdir,
|
||||
attempts=1,
|
||||
run_as_root=False)
|
||||
|
||||
def _make_vfat(self, path, tmpdir):
|
||||
# NOTE(mikal): This is a little horrible, but I couldn't find an
|
||||
# equivalent to genisoimage for vfat filesystems.
|
||||
with open(path, 'wb') as f:
|
||||
f.truncate(CONFIGDRIVESIZE_BYTES)
|
||||
|
||||
utils.mkfs('vfat', path, label='config-2')
|
||||
|
||||
with utils.tempdir() as mountdir:
|
||||
mounted = False
|
||||
try:
|
||||
_, err = utils.trycmd(
|
||||
'mount', '-o', 'loop,uid=%d,gid=%d' % (os.getuid(),
|
||||
os.getgid()),
|
||||
path,
|
||||
mountdir,
|
||||
run_as_root=True)
|
||||
if err:
|
||||
raise exception.ConfigDriveMountFailed(operation='mount',
|
||||
error=err)
|
||||
mounted = True
|
||||
|
||||
# NOTE(mikal): I can't just use shutils.copytree here,
|
||||
# because the destination directory already
|
||||
# exists. This is annoying.
|
||||
for ent in os.listdir(tmpdir):
|
||||
shutil.copytree(os.path.join(tmpdir, ent),
|
||||
os.path.join(mountdir, ent))
|
||||
|
||||
finally:
|
||||
if mounted:
|
||||
utils.execute('umount', mountdir, run_as_root=True)
|
||||
|
||||
def make_drive(self, path):
|
||||
"""Make the config drive.
|
||||
|
||||
:param path: the path to place the config drive image at
|
||||
|
||||
:raises ProcessExecuteError if a helper process has failed.
|
||||
"""
|
||||
with utils.tempdir() as tmpdir:
|
||||
self._write_md_files(tmpdir)
|
||||
|
||||
if CONF.config_drive_format == 'iso9660':
|
||||
self._make_iso9660(path, tmpdir)
|
||||
elif CONF.config_drive_format == 'vfat':
|
||||
self._make_vfat(path, tmpdir)
|
||||
else:
|
||||
raise exception.ConfigDriveUnknownFormat(
|
||||
format=CONF.config_drive_format)
|
||||
|
||||
def cleanup(self):
|
||||
if self.imagefile:
|
||||
fileutils.delete_if_exists(self.imagefile)
|
||||
|
||||
def __repr__(self):
|
||||
return "<ConfigDriveBuilder: " + str(self.mdfiles) + ">"
|
||||
|
||||
|
||||
def required_by(instance):
|
||||
|
||||
image_prop = instance.image_meta.properties.get(
|
||||
"img_config_drive",
|
||||
fields.ConfigDrivePolicy.OPTIONAL)
|
||||
|
||||
return (instance.config_drive or
|
||||
CONF.force_config_drive or
|
||||
image_prop == fields.ConfigDrivePolicy.MANDATORY
|
||||
)
|
||||
|
||||
|
||||
def update_instance(instance):
|
||||
"""Update the instance config_drive setting if necessary
|
||||
|
||||
The image or configuration file settings may override the default instance
|
||||
setting. In this case the instance needs to mirror the actual
|
||||
virtual machine configuration.
|
||||
"""
|
||||
if not instance.config_drive and required_by(instance):
|
||||
instance.config_drive = True
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue