blazar/blazar/plugins/instances/instance_plugin.py

563 lines
24 KiB
Python

# Copyright (c) 2017 NTT.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from novaclient import exceptions as nova_exceptions
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils.strutils import bool_from_string
from blazar import context
from blazar.db import api as db_api
from blazar.db import utils as db_utils
from blazar import exceptions
from blazar.manager import exceptions as mgr_exceptions
from blazar.plugins import base
from blazar.plugins import instances as plugin
from blazar.plugins import oshosts
from blazar import status
from blazar.utils.openstack import nova
from blazar.utils import plugins as plugins_utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
RESERVATION_PREFIX = 'reservation'
FLAVOR_EXTRA_SPEC = "aggregate_instance_extra_specs:" + RESERVATION_PREFIX
class VirtualInstancePlugin(base.BasePlugin, nova.NovaClientWrapper):
"""Plugin for virtual instance resources."""
resource_type = plugin.RESOURCE_TYPE
title = 'Virtual Instance Plugin'
def __init__(self):
super(VirtualInstancePlugin, self).__init__(
username=CONF.os_admin_username,
password=CONF.os_admin_password,
user_domain_name=CONF.os_admin_user_domain_name,
project_name=CONF.os_admin_project_name,
project_domain_name=CONF.os_admin_project_domain_name)
self.freepool_name = CONF.nova.aggregate_freepool_name
self.monitor = oshosts.host_plugin.PhysicalHostMonitorPlugin()
self.monitor.register_healing_handler(self.heal_reservations)
def filter_hosts_by_reservation(self, hosts, start_date, end_date,
excludes):
free = []
non_free = []
for host in hosts:
reservations = db_utils.get_reservations_by_host_id(host['id'],
start_date,
end_date)
if excludes:
reservations = [r for r in reservations
if r['id'] not in excludes]
if reservations == []:
free.append({'host': host, 'reservations': None})
elif not [r for r in reservations
if r['resource_type'] == oshosts.RESOURCE_TYPE]:
non_free.append({'host': host, 'reservations': reservations})
return free, non_free
def max_usages(self, host, reservations):
def resource_usage_by_event(event, resource_type):
return event['reservation']['instance_reservations'][resource_type]
events_list = []
for r in reservations:
fetched_events = db_api.event_get_all_sorted_by_filters(
sort_key='time', sort_dir='asc',
filters={'lease_id': r['lease_id']})
events_list.extend([{'event': e, 'reservation': r}
for e in fetched_events])
events_list.sort(key=lambda x: x['event']['time'])
max_vcpus = max_memory = max_disk = 0
current_vcpus = current_memory = current_disk = 0
for event in events_list:
if event['event']['event_type'] == 'start_lease':
current_vcpus += resource_usage_by_event(event, 'vcpus')
current_memory += resource_usage_by_event(event, 'memory_mb')
current_disk += resource_usage_by_event(event, 'disk_gb')
if max_vcpus < current_vcpus:
max_vcpus = current_vcpus
if max_memory < current_memory:
max_memory = current_memory
if max_disk < current_disk:
max_disk = current_disk
elif event['event']['event_type'] == 'end_lease':
current_vcpus -= resource_usage_by_event(event, 'vcpus')
current_memory -= resource_usage_by_event(event, 'memory_mb')
current_disk -= resource_usage_by_event(event, 'disk_gb')
return max_vcpus, max_memory, max_disk
def query_available_hosts(self, cpus=None, memory=None, disk=None,
resource_properties=None,
start_date=None, end_date=None,
excludes_res=None):
"""Query hosts that are available for a reservation.
Its return value is in the order of reserved hosts to free hosts now.
"""
flavor_definitions = [
'and',
[">=", "$vcpus", str(cpus)],
[">=", "$memory_mb", str(memory)],
[">=", "$local_gb", str(disk)],
]
filters = plugins_utils.convert_requirements(flavor_definitions)
if resource_properties:
filters += plugins_utils.convert_requirements(resource_properties)
hosts = db_api.reservable_host_get_all_by_queries(filters)
free_hosts, reserved_hosts = \
self.filter_hosts_by_reservation(hosts, start_date, end_date,
excludes_res)
available_hosts = []
for host_info in reserved_hosts:
host = host_info['host']
reservations = host_info['reservations']
max_cpus, max_memory, max_disk = self.max_usages(host,
reservations)
if not (max_cpus + cpus > host['vcpus'] or
max_memory + memory > host['memory_mb'] or
max_disk + disk > host['local_gb']):
available_hosts.append(host)
available_hosts.extend([h['host'] for h in free_hosts])
return available_hosts
def pickup_hosts(self, reservation_id, values):
"""Checks whether Blazar can accommodate the request.
This method filters and pick up hosts for this reservation
with following steps.
1. filter hosts that have a spec enough to accommodate the flavor
2. categorize hosts allocated_hosts and not_allocated_hosts
at the reservation time frame
3. filter out hosts used by physical host reservation from
allocate_host
4. filter out hosts that can't accommodate the flavor at the
time frame because of others reservations
"""
query_params = {
'cpus': values['vcpus'],
'memory': values['memory_mb'],
'disk': values['disk_gb'],
'resource_properties': values['resource_properties'],
'start_date': values['start_date'],
'end_date': values['end_date']
}
# add the specific query param for reservation update
old_allocs = db_api.host_allocation_get_all_by_values(
reservation_id=reservation_id)
if old_allocs:
query_params['excludes_res'] = [reservation_id]
new_hosts = self.query_available_hosts(**query_params)
old_host_ids = {h['compute_host_id'] for h in old_allocs}
candidate_ids = {h['id'] for h in new_hosts}
kept_host_ids = old_host_ids & candidate_ids
removed_host_ids = old_host_ids - candidate_ids
extra_host_ids = candidate_ids - old_host_ids
added_host_ids = set([])
if len(kept_host_ids) > values['amount']:
extra = len(kept_host_ids) - values['amount']
for i in range(extra):
removed_host_ids.add(kept_host_ids.pop())
elif len(kept_host_ids) < values['amount']:
less = values['amount'] - len(kept_host_ids)
ordered_extra_host_ids = [h['id'] for h in new_hosts
if h['id'] in extra_host_ids]
for i in range(min(less, len(extra_host_ids))):
added_host_ids.add(ordered_extra_host_ids[i])
return {'added': added_host_ids, 'removed': removed_host_ids}
def _create_flavor(self, reservation_id, vcpus, memory, disk, group_id):
flavor_details = {
'flavorid': reservation_id,
'name': RESERVATION_PREFIX + ":" + reservation_id,
'vcpus': vcpus,
'ram': memory,
'disk': disk,
'is_public': False
}
reserved_flavor = self.nova.nova.flavors.create(**flavor_details)
extra_specs = {
FLAVOR_EXTRA_SPEC: reservation_id,
"affinity_id": group_id
}
reserved_flavor.set_keys(extra_specs)
return reserved_flavor
def _create_resources(self, inst_reservation):
reservation_id = inst_reservation['reservation_id']
ctx = context.current()
user_client = nova.NovaClientWrapper()
reserved_group = user_client.nova.server_groups.create(
RESERVATION_PREFIX + ':' + reservation_id,
'affinity' if inst_reservation['affinity'] else 'anti-affinity'
)
reserved_flavor = self._create_flavor(reservation_id,
inst_reservation['vcpus'],
inst_reservation['memory_mb'],
inst_reservation['disk_gb'],
reserved_group.id)
pool = nova.ReservationPool()
pool_metadata = {
RESERVATION_PREFIX: reservation_id,
'filter_tenant_id': ctx.project_id,
'affinity_id': reserved_group.id
}
agg = pool.create(name=reservation_id, metadata=pool_metadata)
return reserved_flavor, reserved_group, agg
def cleanup_resources(self, instance_reservation):
def check_and_delete_resource(client, id):
try:
client.delete(id)
except nova_exceptions.NotFound:
pass
reservation_id = instance_reservation['reservation_id']
check_and_delete_resource(self.nova.nova.server_groups,
instance_reservation['server_group_id'])
check_and_delete_resource(self.nova.nova.flavors, reservation_id)
check_and_delete_resource(nova.ReservationPool(), reservation_id)
def update_resources(self, reservation_id):
"""Updates reserved resources in Nova.
This method updates reserved resources in Compute service. If the
reservation is in active status, it adds new allocated hosts into
a reserved aggregate. If the reservation is not started yet, it
updates a reserved flavor.
"""
reservation = db_api.reservation_get(reservation_id)
if reservation['status'] == 'active':
pool = nova.ReservationPool()
for allocation in db_api.host_allocation_get_all_by_values(
reservation_id=reservation['id']):
host = db_api.host_get(allocation['compute_host_id'])
try:
pool.add_computehost(
reservation['aggregate_id'],
host['service_name'], stay_in=True)
except mgr_exceptions.AggregateAlreadyHasHost:
pass
except nova_exceptions.ClientException:
err_msg = ('Fail to add host %s to aggregate %s.'
% (host, reservation['aggregate_id']))
raise mgr_exceptions.NovaClientError(err_msg)
else:
try:
self.nova.nova.flavors.delete(reservation['id'])
self._create_flavor(reservation['id'],
reservation['vcpus'],
reservation['memory_mb'],
reservation['disk_gb'],
reservation['server_group_id'])
except nova_exceptions.ClientException:
LOG.exception("Failed to update Nova resources "
"for reservation %s", reservation['id'])
raise mgr_exceptions.NovaClientError()
def validate_reservation_param(self, values):
marshall_attributes = set(['vcpus', 'memory_mb', 'disk_gb',
'amount', 'affinity',
'resource_properties'])
missing_attr = marshall_attributes - set(values.keys())
if missing_attr:
raise mgr_exceptions.MissingParameter(param=','.join(missing_attr))
def reserve_resource(self, reservation_id, values):
self.validate_reservation_param(values)
# TODO(masahito) the instance reservation plugin only supports
# anti-affinity rule in short-term goal.
if bool_from_string(values['affinity']):
raise exceptions.BlazarException('affinity = True is not '
'supported.')
hosts = self.pickup_hosts(reservation_id, values)
if len(hosts['added']) < values['amount']:
raise mgr_exceptions.HostNotFound("The reservation can't be "
"accommodate because of less "
"capacity.")
instance_reservation_val = {
'reservation_id': reservation_id,
'vcpus': values['vcpus'],
'memory_mb': values['memory_mb'],
'disk_gb': values['disk_gb'],
'amount': values['amount'],
'affinity': bool_from_string(values['affinity']),
'resource_properties': values['resource_properties']
}
instance_reservation = db_api.instance_reservation_create(
instance_reservation_val)
for host_id in hosts['added']:
db_api.host_allocation_create({'compute_host_id': host_id,
'reservation_id': reservation_id})
try:
flavor, group, pool = self._create_resources(instance_reservation)
except nova_exceptions.ClientException:
LOG.exception("Failed to create Nova resources "
"for reservation %s", reservation_id)
self.cleanup_resources(instance_reservation)
raise mgr_exceptions.NovaClientError()
db_api.instance_reservation_update(instance_reservation['id'],
{'flavor_id': flavor.id,
'server_group_id': group.id,
'aggregate_id': pool.id})
return instance_reservation['id']
def update_host_allocations(self, added, removed, reservation_id):
allocations = db_api.host_allocation_get_all_by_values(
reservation_id=reservation_id)
removed_allocs = [a for a in allocations
if a['compute_host_id'] in removed]
for alloc in removed_allocs:
db_api.host_allocation_destroy(alloc['id'])
for added_host in added:
db_api.host_allocation_create({'compute_host_id': added_host,
'reservation_id': reservation_id})
def update_reservation(self, reservation_id, new_values):
"""Updates an instance reservation with requested parameters.
This method allows users to update an instance reservation under the
following conditions.
- If an instance reservation has not started yet
- vcpus, memory_mb disk_gb and amount can be updateable unless
Blazar can accomodate the new request.
- If an instance reservation has already started
- only amount is increasable.
"""
# TODO(masahito) the instance reservation plugin only supports
# anti-affinity rule in short-term goal.
if bool_from_string(new_values.get('affinity', None)):
raise exceptions.BlazarException('affinity = True is not '
'supported.')
reservation = db_api.reservation_get(reservation_id)
lease = db_api.lease_get(reservation['lease_id'])
updatable = ['vcpus', 'memory_mb', 'disk_gb', 'affinity', 'amount',
'resource_properties']
if (not any([k in updatable for k in new_values.keys()])
and new_values['start_date'] >= lease['start_date']
and new_values['end_date'] <= lease['end_date']):
# no update because of just shortening the reservation time
return
if (reservation['status'] == 'active' and
any([k in updatable[:-1] for k in new_values.keys()])):
msg = "An active reservation only accepts to update amount."
raise mgr_exceptions.InvalidStateUpdate(msg)
if reservation['status'] == 'error':
msg = "An error reservation doesn't accept an updating request."
raise mgr_exceptions.InvalidStateUpdate(msg)
for key in updatable:
if key not in new_values:
new_values[key] = reservation[key]
changed_hosts = self.pickup_hosts(reservation_id, new_values)
if (reservation['status'] == 'active'
and len(changed_hosts['removed']) > 0):
err_msg = ("Instance reservation doesn't allow to reduce/replace "
"reserved instance slots when the reservation is in "
"active status.")
raise mgr_exceptions.CantUpdateParameter(err_msg)
db_api.instance_reservation_update(
reservation['resource_id'],
{key: new_values[key] for key in updatable})
self.update_host_allocations(changed_hosts['added'],
changed_hosts['removed'],
reservation_id)
try:
self.update_resources(reservation_id)
except mgr_exceptions.NovaClientError:
raise
def on_start(self, resource_id):
ctx = context.current()
instance_reservation = db_api.instance_reservation_get(resource_id)
reservation_id = instance_reservation['reservation_id']
try:
self.nova.flavor_access.add_tenant_access(reservation_id,
ctx.project_id)
except nova_exceptions.ClientException:
LOG.info('Failed to associate flavor %(reservation_id)s '
'to project %(project_id)s',
{'reservation_id': reservation_id,
'project_id': ctx.project_id})
raise mgr_exceptions.EventError()
pool = nova.ReservationPool()
for allocation in db_api.host_allocation_get_all_by_values(
reservation_id=reservation_id):
host = db_api.host_get(allocation['compute_host_id'])
pool.add_computehost(instance_reservation['aggregate_id'],
host['service_name'], stay_in=True)
def on_end(self, resource_id):
instance_reservation = db_api.instance_reservation_get(resource_id)
ctx = context.current()
try:
self.nova.flavor_access.remove_tenant_access(
instance_reservation['reservation_id'], ctx.project_id)
except nova_exceptions.NotFound:
pass
allocations = db_api.host_allocation_get_all_by_values(
reservation_id=instance_reservation['reservation_id'])
for allocation in allocations:
db_api.host_allocation_destroy(allocation['id'])
for server in self.nova.servers.list(search_opts={
'flavor': instance_reservation['reservation_id'],
'all_tenants': 1}, detailed=False):
server.delete()
self.cleanup_resources(instance_reservation)
def heal_reservations(self, failed_resources, interval_begin,
interval_end):
"""Heal reservations which suffer from resource failures.
:param: failed_resources: failed resources
:param: interval_begin: start date of the period to heal.
:param: interval_end: end date of the period to heal.
:return: a dictionary of {reservation id: flags to update}
e.g. {'de27786d-bd96-46bb-8363-19c13b2c6657':
{'missing_resources': True}}
"""
reservation_flags = {}
host_ids = [h['id'] for h in failed_resources]
reservations = db_utils.get_reservations_by_host_ids(host_ids,
interval_begin,
interval_end)
for reservation in reservations:
if reservation['resource_type'] != plugin.RESOURCE_TYPE:
continue
for allocation in [alloc for alloc
in reservation['computehost_allocations']
if alloc['compute_host_id'] in host_ids]:
if self._reallocate(allocation):
if reservation['status'] == status.reservation.ACTIVE:
if reservation['id'] not in reservation_flags:
reservation_flags[reservation['id']] = {}
reservation_flags[reservation['id']].update(
{'resources_changed': True})
else:
if reservation['id'] not in reservation_flags:
reservation_flags[reservation['id']] = {}
reservation_flags[reservation['id']].update(
{'missing_resources': True})
return reservation_flags
def _reallocate(self, allocation):
"""Allocate an alternative host.
:param: allocation: allocation to change.
:return: True if an alternative host was successfully allocated.
"""
reservation = db_api.reservation_get(allocation['reservation_id'])
pool = nova.ReservationPool()
# Remove the failed host from the aggregate.
if reservation['status'] == status.reservation.ACTIVE:
host = db_api.host_get(allocation['compute_host_id'])
pool.remove_computehost(reservation['aggregate_id'],
host['service_name'])
# Allocate an alternative host.
values = {}
lease = db_api.lease_get(reservation['lease_id'])
values['start_date'] = max(datetime.datetime.utcnow(),
lease['start_date'])
values['end_date'] = lease['end_date']
specs = ['vcpus', 'memory_mb', 'disk_gb', 'affinity', 'amount',
'resource_properties']
for key in specs:
values[key] = reservation[key]
changed_hosts = self.pickup_hosts(reservation['id'], values)
if len(changed_hosts['added']) == 0:
db_api.host_allocation_destroy(allocation['id'])
LOG.warn('Could not find alternative host for reservation %s '
'(lease: %s).', reservation['id'], lease['name'])
return False
else:
new_host_id = changed_hosts['added'].pop()
db_api.host_allocation_update(
allocation['id'], {'compute_host_id': new_host_id})
if reservation['status'] == status.reservation.ACTIVE:
# Add the alternative host into the aggregate.
new_host = db_api.host_get(new_host_id)
pool.add_computehost(reservation['aggregate_id'],
new_host['service_name'],
stay_in=True)
LOG.warn('Resource changed for reservation %s (lease: %s).',
reservation['id'], lease['name'])
return True