Make XenAPI use Aggregate object

This makes the XenAPI driver use the Aggregate object for its work,
and avoids the need to call back through virtapi to conductor
directly. It also allows us to convert the two aggregate-related
compute manager methods fully to new-world objects.

Related to blueprint compute-manager-objects
Related to blueprint virt-objects

Closes-bug: #1232179
Change-Id: Ib38ab0e4d6feefebda37888f150752167474b693
This commit is contained in:
Dan Smith 2013-10-08 12:54:27 -07:00
parent 9fb39ab5ea
commit f2f58eef93
6 changed files with 51 additions and 67 deletions

View File

@ -5016,9 +5016,6 @@ class ComputeManager(manager.SchedulerDependentManager):
if not aggregate:
aggregate_obj.Aggregate.get_by_id(context, aggregate_id)
# NOTE(danms): until the drivers support objects, use primitives
aggregate = obj_base.obj_to_primitive(aggregate)
try:
self.driver.add_to_aggregate(context, aggregate, host,
slave_info=slave_info)
@ -5040,9 +5037,6 @@ class ComputeManager(manager.SchedulerDependentManager):
if not aggregate:
aggregate_obj.Aggregate.get_by_id(context, aggregate_id)
# NOTE(danms): until the drivers support objects, use primitives
aggregate = obj_base.obj_to_primitive(aggregate)
try:
self.driver.remove_from_aggregate(context, aggregate, host,
slave_info=slave_info)

View File

@ -37,6 +37,7 @@ from nova import context
from nova import crypto
from nova import db
from nova import exception
from nova.objects import aggregate as aggregate_obj
from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
@ -48,6 +49,7 @@ from nova.tests import fake_network
from nova.tests import fake_processutils
import nova.tests.image.fake as fake_image
from nova.tests import matchers
from nova.tests.objects import test_aggregate
from nova.tests.virt.xenapi import stubs
from nova.virt import fake
from nova.virt.xenapi import agent
@ -1397,10 +1399,10 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
def fake_aggregate_get(context, host, key):
if find_aggregate:
return [{'fake': 'aggregate'}]
return [test_aggregate.fake_aggregate]
else:
return []
self.stubs.Set(self.conn.virtapi, 'aggregate_get_by_host',
self.stubs.Set(db, 'aggregate_get_by_host',
fake_aggregate_get)
def fake_host_find(context, session, src, dst):
@ -1454,7 +1456,7 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
def fake_aggregate_get_by_host(self, *args, **kwargs):
was['called'] = True
raise test.TestingException()
self.stubs.Set(self.conn._session._virtapi, "aggregate_get_by_host",
self.stubs.Set(db, "aggregate_get_by_host",
fake_aggregate_get_by_host)
self.stubs.Set(self.conn._session, "is_slave", True)
@ -2981,18 +2983,17 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
fake_pool_set_name_label)
self.conn._session.call_xenapi("pool.create", {"name": "asdf"})
values = {"name": 'fake_aggregate',
'metadata': {'availability_zone': 'fake_zone'}}
result = db.aggregate_create(self.context, values)
metadata = {'availability_zone': 'fake_zone',
pool_states.POOL_FLAG: "XenAPI",
pool_states.KEY: pool_states.CREATED}
db.aggregate_metadata_add(self.context, result['id'], metadata)
db.aggregate_host_add(self.context, result['id'], "host")
aggregate = db.aggregate_get(self.context, result['id'])
self.assertEqual(["host"], aggregate['hosts'])
self.assertEqual(metadata, aggregate['metadetails'])
aggregate = aggregate_obj.Aggregate()
aggregate.name = 'fake_aggregate'
aggregate.metadata = dict(metadata)
aggregate.create(self.context)
aggregate.add_host('host')
self.assertEqual(["host"], aggregate.hosts)
self.assertEqual(metadata, aggregate.metadata)
self.conn._pool.add_to_aggregate(self.context, aggregate, "host")
self.assertTrue(fake_pool_set_name_label.called)
@ -3053,18 +3054,18 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
aggr_zone='fake_zone',
aggr_state=pool_states.CREATED,
hosts=['host'], metadata=None):
values = {"name": aggr_name}
result = db.aggregate_create(self.context, values,
metadata={'availability_zone': aggr_zone})
pool_flag = {pool_states.POOL_FLAG: "XenAPI",
pool_states.KEY: aggr_state}
db.aggregate_metadata_add(self.context, result['id'], pool_flag)
for host in hosts:
db.aggregate_host_add(self.context, result['id'], host)
aggregate = aggregate_obj.Aggregate()
aggregate.name = aggr_name
aggregate.metadata = {'availability_zone': aggr_zone,
pool_states.POOL_FLAG: 'XenAPI',
pool_states.KEY: aggr_state,
}
if metadata:
db.aggregate_metadata_add(self.context, result['id'], metadata)
return db.aggregate_get(self.context, result['id'])
aggregate.metadata.update(metadata)
aggregate.create(self.context)
for host in hosts:
aggregate.add_host(host)
return aggregate
def test_add_host_to_aggregate_invalid_changing_status(self):
"""Ensure InvalidAggregateAction is raised when adding host while
@ -3200,7 +3201,7 @@ class HypervisorPoolTestCase(test.NoDBTestCase):
fake_aggregate = {
'id': 98,
'hosts': [],
'metadetails': {
'metadata': {
'master_compute': 'master',
pool_states.POOL_FLAG: {},
pool_states.KEY: {}
@ -3443,13 +3444,10 @@ class XenAPILiveMigrateTestCase(stubs.XenAPITestBase):
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False)
class fake_aggregate:
def __init__(self):
self.metadetails = {"host": "test_host_uuid"}
def fake_aggregate_get_by_host(context, host, key=None):
self.assertEqual(CONF.host, host)
return [fake_aggregate()]
return [dict(test_aggregate.fake_aggregate,
metadetails={"host": "test_host_uuid"})]
self.stubs.Set(db, "aggregate_get_by_host",
fake_aggregate_get_by_host)
@ -3460,13 +3458,10 @@ class XenAPILiveMigrateTestCase(stubs.XenAPITestBase):
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False)
class fake_aggregate:
def __init__(self):
self.metadetails = {"dest_other": "test_host_uuid"}
def fake_aggregate_get_by_host(context, host, key=None):
self.assertEqual(CONF.host, host)
return [fake_aggregate()]
return [dict(test_aggregate.fake_aggregate,
metadetails={"dest_other": "test_host_uuid"})]
self.stubs.Set(db, "aggregate_get_by_host",
fake_aggregate_get_by_host)

View File

@ -49,6 +49,7 @@ from oslo.config import cfg
from nova import context
from nova import exception
from nova.objects import aggregate as aggregate_obj
from nova.openstack.common.gettextutils import _
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
@ -717,7 +718,7 @@ class XenAPISession(object):
def _get_host_uuid(self):
if self.is_slave:
aggr = self._virtapi.aggregate_get_by_host(
aggr = aggregate_obj.AggregateList.get_by_host(
context.get_admin_context(),
CONF.host, key=pool_states.POOL_FLAG)[0]
if not aggr:

View File

@ -24,6 +24,7 @@ from nova.compute import vm_states
from nova import conductor
from nova import context
from nova import exception
from nova.objects import aggregate as aggregate_obj
from nova.objects import instance as instance_obj
from nova.openstack.common.gettextutils import _
from nova.openstack.common import jsonutils
@ -79,7 +80,7 @@ class Host(object):
instance = instance_obj.Instance.get_by_uuid(ctxt, uuid)
vm_counter = vm_counter + 1
aggregate = self._virtapi.aggregate_get_by_host(
aggregate = aggregate_obj.AggregateList.get_by_host(
ctxt, host, key=pool_states.POOL_FLAG)
if not aggregate:
msg = _('Aggregate for host %(host)s count not be'

View File

@ -64,8 +64,7 @@ class ResourcePool(object):
try:
if set_error:
metadata = {pool_states.KEY: pool_states.ERROR}
self._virtapi.aggregate_metadata_add(context, aggregate,
metadata)
aggregate.update_metadata(metadata)
op(context, aggregate, host)
except Exception:
LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state '
@ -74,23 +73,21 @@ class ResourcePool(object):
def add_to_aggregate(self, context, aggregate, host, slave_info=None):
"""Add a compute host to an aggregate."""
if not pool_states.is_hv_pool(aggregate['metadetails']):
if not pool_states.is_hv_pool(aggregate['metadata']):
return
invalid = {pool_states.CHANGING: 'setup in progress',
pool_states.DISMISSED: 'aggregate deleted',
pool_states.ERROR: 'aggregate in error'}
if (aggregate['metadetails'][pool_states.KEY] in invalid.keys()):
if (aggregate['metadata'][pool_states.KEY] in invalid.keys()):
raise exception.InvalidAggregateAction(
action='add host',
aggregate_id=aggregate['id'],
reason=aggregate['metadetails'][pool_states.KEY])
reason=aggregate['metadata'][pool_states.KEY])
if (aggregate['metadetails'][pool_states.KEY] == pool_states.CREATED):
self._virtapi.aggregate_metadata_add(context, aggregate,
{pool_states.KEY:
pool_states.CHANGING})
if (aggregate['metadata'][pool_states.KEY] == pool_states.CREATED):
aggregate.update_metadata({pool_states.KEY: pool_states.CHANGING})
if len(aggregate['hosts']) == 1:
# this is the first host of the pool -> make it master
self._init_pool(aggregate['id'], aggregate['name'])
@ -98,12 +95,11 @@ class ResourcePool(object):
metadata = {'master_compute': host,
host: self._host_uuid,
pool_states.KEY: pool_states.ACTIVE}
self._virtapi.aggregate_metadata_add(context, aggregate,
metadata)
aggregate.update_metadata(metadata)
else:
# the pool is already up and running, we need to figure out
# whether we can serve the request from this host or not.
master_compute = aggregate['metadetails']['master_compute']
master_compute = aggregate['metadata']['master_compute']
if master_compute == CONF.host and master_compute != host:
# this is the master -> do a pool-join
# To this aim, nova compute on the slave has to go down.
@ -113,8 +109,7 @@ class ResourcePool(object):
slave_info.get('url'), slave_info.get('user'),
slave_info.get('passwd'))
metadata = {host: slave_info.get('xenhost_uuid'), }
self._virtapi.aggregate_metadata_add(context, aggregate,
metadata)
aggregate.update_metadata(metadata)
elif master_compute and master_compute != host:
# send rpc cast to master, asking to add the following
# host with specified credentials.
@ -126,26 +121,25 @@ class ResourcePool(object):
def remove_from_aggregate(self, context, aggregate, host, slave_info=None):
"""Remove a compute host from an aggregate."""
slave_info = slave_info or dict()
if not pool_states.is_hv_pool(aggregate['metadetails']):
if not pool_states.is_hv_pool(aggregate['metadata']):
return
invalid = {pool_states.CREATED: 'no hosts to remove',
pool_states.CHANGING: 'setup in progress',
pool_states.DISMISSED: 'aggregate deleted', }
if aggregate['metadetails'][pool_states.KEY] in invalid.keys():
if aggregate['metadata'][pool_states.KEY] in invalid.keys():
raise exception.InvalidAggregateAction(
action='remove host',
aggregate_id=aggregate['id'],
reason=invalid[aggregate['metadetails'][pool_states.KEY]])
reason=invalid[aggregate['metadata'][pool_states.KEY]])
master_compute = aggregate['metadetails']['master_compute']
master_compute = aggregate['metadata']['master_compute']
if master_compute == CONF.host and master_compute != host:
# this is the master -> instruct it to eject a host from the pool
host_uuid = aggregate['metadetails'][host]
host_uuid = aggregate['metadata'][host]
self._eject_slave(aggregate['id'],
slave_info.get('compute_uuid'), host_uuid)
self._virtapi.aggregate_metadata_delete(context, aggregate,
host)
aggregate.update_metadata({host: None})
elif master_compute == host:
# Remove master from its own pool -> destroy pool only if the
# master is on its own, otherwise raise fault. Destroying a
@ -160,9 +154,7 @@ class ResourcePool(object):
'from the pool; pool not empty')
% host)
self._clear_pool(aggregate['id'])
for key in ['master_compute', host]:
self._virtapi.aggregate_metadata_delete(context,
aggregate, key)
aggregate.update_metadata({'master_compute': None, host: None})
elif master_compute and master_compute != host:
# A master exists -> forward pool-eject request to master
slave_info = self._create_slave_info()

View File

@ -37,6 +37,7 @@ from nova.compute import vm_mode
from nova.compute import vm_states
from nova import context as nova_context
from nova import exception
from nova.objects import aggregate as aggregate_obj
from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
@ -1726,12 +1727,12 @@ class VMOps(object):
network_info=network_info)
def _get_host_uuid_from_aggregate(self, context, hostname):
current_aggregate = self._virtapi.aggregate_get_by_host(
current_aggregate = aggregate_obj.AggregateList.get_by_host(
context, CONF.host, key=pool_states.POOL_FLAG)[0]
if not current_aggregate:
raise exception.AggregateHostNotFound(host=CONF.host)
try:
return current_aggregate.metadetails[hostname]
return current_aggregate.metadata[hostname]
except KeyError:
reason = _('Destination host:%s must be in the same '
'aggregate as the source server') % hostname