blueprint host-aggregates: host maintenance - xenapi implementation

this changeset introduces the following:

- refactoring around host-related operations for xenapi
- increased test coverage
- first cut at implementing host evacuation for a XS/XCP host

Change-Id: I8509cdde95f6777ecfa928663b0c4bedbccf5d38
This commit is contained in:
Armando Migliaccio 2012-02-17 16:17:44 +00:00
parent 0c6765a71a
commit 939f082cc2
14 changed files with 321 additions and 166 deletions

View File

@ -1640,7 +1640,9 @@ class HostAPI(base.Base):
def set_host_maintenance(self, context, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
guest VMs evacuation."""
raise NotImplementedError()
return _call_compute_message(self.db, "host_maintenance_mode", context,
host=host, params={"host": host,
"mode": mode})
class AggregateAPI(base.Base):

View File

@ -1463,6 +1463,12 @@ class ComputeManager(manager.SchedulerDependentManager):
"""Reboots, shuts down or powers up the host."""
return self.driver.host_power_action(host, action)
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def host_maintenance_mode(self, context, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
guest VMs evacuation."""
return self.driver.host_maintenance_mode(host, mode)
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def set_host_enabled(self, context, host=None, enabled=None):
"""Sets the specified host's ability to accept new instances."""

View File

@ -20,7 +20,6 @@ import ast
import contextlib
import datetime
import functools
import json
import os
import re
import stubout
@ -1114,52 +1113,22 @@ class CompareVersionTestCase(test.TestCase):
self.assertTrue(vmops.cmp_version('1.2.3', '1.2.3.4') < 0)
class FakeXenApi(object):
"""Fake XenApi for testing HostState."""
class FakeSR(object):
def get_record(self, ref):
return {'virtual_allocation': 10000,
'physical_utilisation': 20000}
SR = FakeSR()
class FakeSession(object):
"""Fake Session class for HostState testing."""
def async_call_plugin(self, *args):
return None
def wait_for_task(self, *args):
vm = {'total': 10,
'overhead': 20,
'free': 30,
'free-computed': 40}
return json.dumps({'host_memory': vm})
def call_xenapi(self, method, *args):
f = FakeXenApi()
for m in method.split('.'):
f = getattr(f, m)
return f(*args)
class HostStateTestCase(test.TestCase):
class XenAPIHostTestCase(test.TestCase):
"""Tests HostState, which holds metrics from XenServer that get
reported back to the Schedulers."""
@classmethod
def _fake_safe_find_sr(cls, session):
"""None SR ref since we're ignoring it in FakeSR."""
return None
def setUp(self):
super(XenAPIHostTestCase, self).setUp()
self.stubs = stubout.StubOutForTesting()
self.flags(xenapi_connection_url='test_url',
xenapi_connection_password='test_pass')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
xenapi_fake.reset()
xenapi_fake.create_local_srs()
self.conn = xenapi_conn.get_connection(False)
def test_host_state(self):
self.stubs = stubout.StubOutForTesting()
self.stubs.Set(vm_utils.VMHelper, 'safe_find_sr',
self._fake_safe_find_sr)
host_state = xenapi_conn.HostState(FakeSession())
stats = host_state._stats
stats = self.conn.get_host_stats()
self.assertEquals(stats['disk_total'], 10000)
self.assertEquals(stats['disk_used'], 20000)
self.assertEquals(stats['host_memory_total'], 10)
@ -1167,6 +1136,36 @@ class HostStateTestCase(test.TestCase):
self.assertEquals(stats['host_memory_free'], 30)
self.assertEquals(stats['host_memory_free_computed'], 40)
def _test_host_action(self, method, action, expected=None):
result = method('host', action)
if not expected:
expected = action
self.assertEqual(result, expected)
def test_host_reboot(self):
self._test_host_action(self.conn.host_power_action, 'reboot')
def test_host_shutdown(self):
self._test_host_action(self.conn.host_power_action, 'shutdown')
def test_host_startup(self):
self.assertRaises(NotImplementedError,
self.conn.host_power_action, 'host', 'startup')
def test_host_maintenance_on(self):
self._test_host_action(self.conn.host_maintenance_mode,
True, 'on_maintenance')
def test_host_maintenance_off(self):
self._test_host_action(self.conn.host_maintenance_mode,
False, 'off_maintenance')
def test_set_enable_host_enable(self):
self._test_host_action(self.conn.set_host_enabled, True, 'enabled')
def test_set_enable_host_disable(self):
self._test_host_action(self.conn.set_host_enabled, False, 'disabled')
class XenAPIAutoDiskConfigTestCase(test.TestCase):
def setUp(self):

View File

@ -206,8 +206,7 @@ class FakeSessionForVMTests(fake.SessionBase):
def VM_start_on(self, _1, vm_ref, host_ref, _2, _3):
vm_rec = self.VM_start(_1, vm_ref, _2, _3)
host_rec = fake.get_record('host', host_ref)
vm_rec['resident_on'] = host_rec['uuid']
vm_rec['resident_on'] = host_ref
def VM_snapshot(self, session_ref, vm_ref, label):
status = "Running"

View File

@ -544,6 +544,11 @@ class ComputeDriver(object):
"""Reboots, shuts down or powers up the host."""
raise NotImplementedError()
def host_maintenance_mode(self, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
guest VMs evacuation."""
raise NotImplementedError()
def set_host_enabled(self, host, enabled):
"""Sets the specified host's ability to accept new instances."""
# TODO(Vek): Need to pass context in for access to auth_token

View File

@ -321,6 +321,11 @@ class FakeConnection(driver.ComputeDriver):
"""Reboots, shuts down or powers up the host."""
pass
def host_maintenance_mode(self, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
guest VMs evacuation."""
pass
def set_host_enabled(self, host, enabled):
"""Sets the specified host's ability to accept new instances."""
pass

View File

@ -2108,6 +2108,11 @@ class LibvirtConnection(driver.ComputeDriver):
"""Reboots, shuts down or powers up the host."""
raise NotImplementedError()
def host_maintenance_mode(self, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
guest VMs evacuation."""
raise NotImplementedError()
def set_host_enabled(self, host, enabled):
"""Sets the specified host's ability to accept new instances."""
pass

View File

@ -203,11 +203,16 @@ class VMWareESXConnection(driver.ComputeDriver):
def host_power_action(self, host, action):
"""Reboots, shuts down or powers up the host."""
pass
raise NotImplementedError()
def host_maintenance_mode(self, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
guest VMs evacuation."""
raise NotImplementedError()
def set_host_enabled(self, host, enabled):
"""Sets the specified host's ability to accept new instances."""
pass
raise NotImplementedError()
def plug_vifs(self, instance, network_info):
"""Plug VIFs into networks."""

View File

@ -80,11 +80,12 @@ def log_db_contents(msg=None):
def reset():
for c in _CLASSES:
_db_content[c] = {}
create_host('fake')
host = create_host('fake')
create_vm('fake',
'Running',
is_a_template=False,
is_control_domain=True)
is_control_domain=True,
host_ref=host)
def reset_table(table):
@ -112,14 +113,15 @@ def create_network(name_label, bridge):
def create_vm(name_label, status,
is_a_template=False, is_control_domain=False):
is_a_template=False, is_control_domain=False, host_ref=None):
domid = status == 'Running' and random.randrange(1, 1 << 16) or -1
return _create_object('VM',
{'name_label': name_label,
'domid': domid,
'power-state': status,
'is_a_template': is_a_template,
'is_control_domain': is_control_domain})
'is_control_domain': is_control_domain,
'resident_on': host_ref})
def destroy_vm(vm_ref):
@ -220,12 +222,16 @@ def create_local_srs():
other_config={'i18n-original-value-name_label':
'Local storage',
'i18n-key': 'local-storage'},
physical_utilisation=20000,
virtual_allocation=10000,
host_ref=host_ref)
create_sr(name_label='Local storage ISO',
type='iso',
other_config={'i18n-original-value-name_label':
'Local storage ISO',
'i18n-key': 'local-storage-iso'},
physical_utilisation=40000,
virtual_allocation=80000,
host_ref=host_ref)
@ -234,13 +240,14 @@ def create_sr(**kwargs):
'SR',
{'name_label': kwargs.get('name_label'),
'type': kwargs.get('type'),
'content_type': 'user',
'shared': False,
'physical_size': str(1 << 30),
'physical_utilisation': str(0),
'virtual_allocation': str(0),
'other_config': kwargs.get('other_config'),
'VDIs': []})
'content_type': kwargs.get('type', 'user'),
'shared': kwargs.get('shared', False),
'physical_size': kwargs.get('physical_size', str(1 << 30)),
'physical_utilisation': str(
kwargs.get('physical_utilisation', 0)),
'virtual_allocation': str(kwargs.get('virtual_allocation', 0)),
'other_config': kwargs.get('other_config', {}),
'VDIs': kwargs.get('VDIs', [])})
pbd_ref = create_pbd('', kwargs.get('host_ref'), sr_ref, True)
_db_content['SR'][sr_ref]['PBDs'] = [pbd_ref]
return sr_ref
@ -254,6 +261,7 @@ def _create_local_pif(host_ref):
'VLAN': -1,
'device': 'fake0',
'host_uuid': host_ref})
return pif_ref
def _create_object(table, obj):
@ -494,6 +502,18 @@ class SessionBase(object):
return ''
elif (plugin, method) == ('migration', 'transfer_vhd'):
return ''
elif (plugin, method) == ('xenhost', 'host_data'):
return json.dumps({'host_memory': {'total': 10,
'overhead': 20,
'free': 30,
'free-computed': 40}, })
elif (plugin == 'xenhost' and method in ['host_reboot',
'host_startup',
'host_shutdown']):
return json.dumps({"power_action": method[5:]})
elif (plugin, method) == ('xenhost', 'set_host_enabled'):
enabled = 'enabled' if _5.get('enabled') == 'true' else 'disabled'
return json.dumps({"status": enabled})
else:
raise Exception('No simulation in host_call_plugin for %s,%s' %
(plugin, method))
@ -679,7 +699,6 @@ class SessionBase(object):
# Add RO fields
if cls == 'VM':
obj['power_state'] = 'Halted'
return ref
def _destroy(self, name, params):

201
nova/virt/xenapi/host.py Normal file
View File

@ -0,0 +1,201 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Citrix Systems, Inc.
# Copyright 2010 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Management class for host-related functions (start, reboot, etc).
"""
import logging
import json
import random
from nova import context
from nova import db
from nova import exception
from nova.virt.xenapi import vm_utils
LOG = logging.getLogger(__name__)
class Host(object):
"""
Implements host related operations.
"""
def __init__(self, session):
self.XenAPI = session.get_imported_xenapi()
self._session = session
def host_power_action(self, host, action):
"""Reboots or shuts down the host."""
args = {"action": json.dumps(action)}
methods = {"reboot": "host_reboot", "shutdown": "host_shutdown"}
response = call_xenhost(self._session, methods[action], args)
return response.get("power_action", response)
def host_maintenance_mode(self, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
guest VMs evacuation."""
if mode:
host_list = [host_ref for host_ref in
self._session.call_xenapi('host.get_all') \
if host_ref != self._session.get_xenapi_host()]
migrations_counter = vm_counter = 0
ctxt = context.get_admin_context()
for vm_ref, vm_rec in vm_utils.VMHelper.list_vms(self._session):
for host_ref in host_list:
try:
# Ensure only guest instances are migrated
uuid = vm_rec['other_config'].get('nova_uuid')
if not uuid:
name = vm_rec['name_label']
uuid = _uuid_find(ctxt, host, name)
if not uuid:
msg = _('Instance %(name)s running on %(host)s'
' could not be found in the database:'
' assuming it is a worker VM and skip'
'ping migration to a new host')
LOG.info(msg % locals())
continue
instance = db.instance_get_by_uuid(ctxt, uuid)
vm_counter = vm_counter + 1
self._session.call_xenapi('VM.pool_migrate',
vm_ref, host_ref, {})
new_host = _host_find(ctxt, self._session,
host, host_ref)
db.instance_update(ctxt,
instance.id, {'host': new_host})
migrations_counter = migrations_counter + 1
break
except self.XenAPI.Failure:
LOG.exception('Unable to migrate VM %(vm_ref)s'
'from %(host)s' % locals())
if vm_counter == migrations_counter:
return 'on_maintenance'
else:
raise exception.NoValidHost(reason='Unable to find suitable '
'host for VMs evacuation')
else:
return 'off_maintenance'
def set_host_enabled(self, host, enabled):
"""Sets the specified host's ability to accept new instances."""
args = {"enabled": json.dumps(enabled)}
response = call_xenhost(self._session, "set_host_enabled", args)
return response.get("status", response)
class HostState(object):
"""Manages information about the XenServer host this compute
node is running on.
"""
def __init__(self, session):
super(HostState, self).__init__()
self._session = session
self._stats = {}
self.update_status()
def get_host_stats(self, refresh=False):
"""Return the current state of the host. If 'refresh' is
True, run the update first.
"""
if refresh:
self.update_status()
return self._stats
def update_status(self):
"""Since under Xenserver, a compute node runs on a given host,
we can get host status information using xenapi.
"""
LOG.debug(_("Updating host stats"))
data = call_xenhost(self._session, "host_data", {})
if data:
try:
# Get the SR usage
sr_ref = vm_utils.VMHelper.safe_find_sr(self._session)
except exception.NotFound as e:
# No SR configured
LOG.error(_("Unable to get SR for this host: %s") % e)
return
sr_rec = self._session.call_xenapi("SR.get_record", sr_ref)
total = int(sr_rec["virtual_allocation"])
used = int(sr_rec["physical_utilisation"])
data["disk_total"] = total
data["disk_used"] = used
data["disk_available"] = total - used
host_memory = data.get('host_memory', None)
if host_memory:
data["host_memory_total"] = host_memory.get('total', 0)
data["host_memory_overhead"] = host_memory.get('overhead', 0)
data["host_memory_free"] = host_memory.get('free', 0)
data["host_memory_free_computed"] = host_memory.get(
'free-computed', 0)
del data['host_memory']
self._stats = data
def call_xenhost(session, method, arg_dict):
"""There will be several methods that will need this general
handling for interacting with the xenhost plugin, so this abstracts
out that behavior.
"""
# Create a task ID as something that won't match any instance ID
task_id = random.randint(-80000, -70000)
XenAPI = session.get_imported_xenapi()
try:
task = session.async_call_plugin("xenhost", method, args=arg_dict)
task_result = session.wait_for_task(task, str(task_id))
if not task_result:
task_result = json.dumps("")
return json.loads(task_result)
except ValueError:
LOG.exception(_("Unable to get updated status"))
return None
except XenAPI.Failure as e:
LOG.error(_("The call to %(method)s returned "
"an error: %(e)s.") % locals())
return e.details[1]
def _uuid_find(context, host, name_label):
"""Return instance uuid by name_label."""
for i in db.instance_get_all_by_host(context, host):
if i.name == name_label:
return i['uuid']
return None
def _host_find(context, session, src, dst):
"""Return the host from the xenapi host reference.
:param src: the compute host being put in maintenance (source of VMs)
:param dst: the hypervisor host reference (destination of VMs)
:return: the compute host that manages dst
"""
# NOTE: this would be a lot simpler if nova-compute stored
# FLAGS.host in the XenServer host's other-config map.
# TODO: improve according the note above
aggregate = db.aggregate_get_by_host(context, src)
uuid = session.call_xenapi('host.get_record', dst)['uuid']
for compute_host, host_uuid in aggregate.metadetails.iteritems():
if host_uuid == uuid:
return compute_host
raise exception.NoValidHost(reason='Host %(host_uuid)s could not be found '
'from aggregate metadata: %(metadata)s.' %
{'host_uuid': uuid,
'metadata': aggregate.metadetails})

View File

@ -175,8 +175,8 @@ class VMHelper(HelperBase):
'memory_target': mem,
'name_description': '',
'name_label': instance.name,
'other_config': {'allowvssprovider': False},
'other_config': {},
'other_config': {'allowvssprovider': str(False),
'nova_uuid': str(instance.uuid), },
'PCI_bus': '',
'platform': {'acpi': 'true', 'apic': 'true', 'pae': 'true',
'viridian': 'true', 'timeoffset': '0'},
@ -952,7 +952,8 @@ class VMHelper(HelperBase):
@classmethod
def list_vms(cls, session):
for vm_ref, vm_rec in cls.get_all_refs_and_recs(session, 'VM'):
if vm_rec["is_a_template"] or vm_rec["is_control_domain"]:
if (vm_rec["resident_on"] != session.get_xenapi_host() or
vm_rec["is_a_template"] or vm_rec["is_control_domain"]):
continue
else:
yield vm_ref, vm_rec

View File

@ -1449,43 +1449,6 @@ class VMOps(object):
return {'host': FLAGS.vncserver_proxyclient_address, 'port': 80,
'internal_access_path': path}
def host_power_action(self, host, action):
"""Reboots or shuts down the host."""
args = {"action": json.dumps(action)}
methods = {"reboot": "host_reboot", "shutdown": "host_shutdown"}
json_resp = self._call_xenhost(methods[action], args)
resp = json.loads(json_resp)
return resp["power_action"]
def set_host_enabled(self, host, enabled):
"""Sets the specified host's ability to accept new instances."""
args = {"enabled": json.dumps(enabled)}
xenapi_resp = self._call_xenhost("set_host_enabled", args)
try:
resp = json.loads(xenapi_resp)
except TypeError as e:
# Already logged; return the message
return xenapi_resp.details[-1]
return resp["status"]
def _call_xenhost(self, method, arg_dict):
"""There will be several methods that will need this general
handling for interacting with the xenhost plugin, so this abstracts
out that behavior.
"""
# Create a task ID as something that won't match any instance ID
task_id = random.randint(-80000, -70000)
try:
task = self._session.async_call_plugin("xenhost", method,
args=arg_dict)
#args={"params": arg_dict})
ret = self._session.wait_for_task(task, str(task_id))
except self.XenAPI.Failure as e:
ret = e
LOG.error(_("The call to %(method)s returned an error: %(e)s.")
% locals())
return ret
def inject_network_info(self, instance, network_info, vm_ref=None):
"""
Generate the network info and make calls to place it into the

View File

@ -58,9 +58,6 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
"""
import contextlib
import json
import random
import sys
import time
import urlparse
import xmlrpclib
@ -73,11 +70,11 @@ from eventlet import timeout
from nova import context
from nova import db
from nova import exception
from nova import utils
from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
from nova.virt import driver
from nova.virt.xenapi import host
from nova.virt.xenapi import pool
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi.vmops import VMOps
@ -178,6 +175,7 @@ class XenAPIConnection(driver.ComputeDriver):
self._session = XenAPISession(url, user, pw)
self._volumeops = VolumeOps(self._session)
self._host_state = None
self._host = host.Host(self._session)
self._product_version = self._session.get_product_version()
self._vmops = VMOps(self._session, self._product_version)
self._initiator = None
@ -186,7 +184,7 @@ class XenAPIConnection(driver.ComputeDriver):
@property
def host_state(self):
if not self._host_state:
self._host_state = HostState(self._session)
self._host_state = host.HostState(self._session)
return self._host_state
def init_host(self, host):
@ -481,14 +479,19 @@ class XenAPIConnection(driver.ComputeDriver):
raise an exception.
"""
if action in ("reboot", "shutdown"):
return self._vmops.host_power_action(host, action)
return self._host.host_power_action(host, action)
else:
msg = _("Host startup on XenServer is not supported.")
raise NotImplementedError(msg)
def set_host_enabled(self, host, enabled):
"""Sets the specified host's ability to accept new instances."""
return self._vmops.set_host_enabled(host, enabled)
return self._host.set_host_enabled(host, enabled)
def host_maintenance_mode(self, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
guest VMs evacuation."""
return self._host.host_maintenance_mode(host, mode)
def add_to_aggregate(self, context, aggregate, host, **kwargs):
"""Add a compute host to an aggregate."""
@ -671,65 +674,6 @@ class XenAPISession(object):
raise
class HostState(object):
"""Manages information about the XenServer host this compute
node is running on.
"""
def __init__(self, session):
super(HostState, self).__init__()
self._session = session
self._stats = {}
self.update_status()
def get_host_stats(self, refresh=False):
"""Return the current state of the host. If 'refresh' is
True, run the update first.
"""
if refresh:
self.update_status()
return self._stats
def update_status(self):
"""Since under Xenserver, a compute node runs on a given host,
we can get host status information using xenapi.
"""
LOG.debug(_("Updating host stats"))
# Make it something unlikely to match any actual instance UUID
task_id = random.randint(-80000, -70000)
task = self._session.async_call_plugin("xenhost", "host_data", {})
task_result = self._session.wait_for_task(task, str(task_id))
if not task_result:
task_result = json.dumps("")
try:
data = json.loads(task_result)
except ValueError as e:
# Invalid JSON object
LOG.error(_("Unable to get updated status: %s") % e)
return
# Get the SR usage
try:
sr_ref = vm_utils.VMHelper.safe_find_sr(self._session)
except exception.NotFound as e:
# No SR configured
LOG.error(_("Unable to get SR for this host: %s") % e)
return
sr_rec = self._session.call_xenapi("SR.get_record", sr_ref)
total = int(sr_rec["virtual_allocation"])
used = int(sr_rec["physical_utilisation"])
data["disk_total"] = total
data["disk_used"] = used
data["disk_available"] = total - used
host_memory = data.get('host_memory', None)
if host_memory:
data["host_memory_total"] = host_memory.get('total', 0)
data["host_memory_overhead"] = host_memory.get('overhead', 0)
data["host_memory_free"] = host_memory.get('free', 0)
data["host_memory_free_computed"] = host_memory.get(
'free-computed', 0)
del data['host_memory']
self._stats = data
def _parse_xmlrpc_value(val):
"""Parse the given value as if it were an XML-RPC value. This is
sometimes used as the format for the task.result field."""

View File

@ -253,7 +253,8 @@ def _power_action(action):
result = _run_command("xe vm-shutdown --multiple power-state=running")
if result:
raise pluginlib.PluginError(result)
cmds = {"reboot": "xe host-reboot", "startup": "xe host-power-on",
cmds = {"reboot": "xe host-reboot",
"startup": "xe host-power-on",
"shutdown": "xe host-shutdown"}
result = _run_command(cmds[action])
# Should be empty string