Aggregate create and destroy work against API db

Make aggregate.create() and destroy() use the API rather than cell database.
Also block aggregate creation until main database empty. This makes
Aggregate.create() fail until the main database has had all of its aggreagtes
migrated. Since we want to avoid any overlap or clashes in integer ids we
need to enforce this.

Note that this includes a change to a notification sample, which encodes
the function and module of a sample exception (which happens to be during
an aggregate operation). Since the notifications are encoding internal
function names, which can and will change over time, this is an expected
change.

blueprint cells-aggregate-api-db

Co-Authored-By: Dan Smith <dansmith@redhat.com>
Change-Id: Ida70e3c05f93d6044ddef4fcbc1af999ac1b1944
This commit is contained in:
Mark Doffman 2016-05-11 13:04:30 -05:00 committed by Dan Smith
parent ffe8b92a6f
commit 7f82c5e681
11 changed files with 328 additions and 36 deletions

View File

@ -4,8 +4,8 @@
"nova_object.data": {
"exception": "AggregateNameExists",
"exception_message": "Aggregate versioned_exc_aggregate already exists.",
"function_name": "aggregate_create",
"module_name": "nova.db.sqlalchemy.api"
"function_name": "_aggregate_create_in_db",
"module_name": "nova.objects.aggregate"
},
"nova_object.name": "ExceptionPayload",
"nova_object.namespace": "nova",

View File

@ -71,6 +71,9 @@ class AggregateController(wsgi.Controller):
aggregate = self.api.create_aggregate(context, name, avail_zone)
except exception.AggregateNameExists as e:
raise exc.HTTPConflict(explanation=e.format_message())
except exception.ObjectActionError:
raise exc.HTTPConflict(explanation=_(
'Not all aggregates have been migrated to the API database'))
except exception.InvalidAggregateAction as e:
raise exc.HTTPBadRequest(explanation=e.format_message())

View File

@ -74,9 +74,11 @@ class Aggregate(API_BASE):
uuid = Column(String(36))
name = Column(String(255))
_hosts = orm.relationship(AggregateHost,
primaryjoin='Aggregate.id == AggregateHost.aggregate_id')
primaryjoin='Aggregate.id == AggregateHost.aggregate_id',
cascade='delete')
_metadata = orm.relationship(AggregateMetadata,
primaryjoin='Aggregate.id == AggregateMetadata.aggregate_id')
primaryjoin='Aggregate.id == AggregateMetadata.aggregate_id',
cascade='delete')
@property
def _extra_keys(self):

View File

@ -23,8 +23,9 @@ from nova.compute import utils as compute_utils
from nova import db
from nova.db.sqlalchemy import api as db_api
from nova.db.sqlalchemy import api_models
from nova.db.sqlalchemy import models as main_models
from nova import exception
from nova.i18n import _
from nova.i18n import _, _LW
from nova import objects
from nova.objects import base
from nova.objects import fields
@ -159,6 +160,45 @@ def _metadata_delete_from_db(context, aggregate_id, key):
aggregate_id=aggregate_id, metadata_key=key)
@db_api.api_context_manager.writer
def _aggregate_create_in_db(context, values, metadata=None):
query = context.session.query(api_models.Aggregate)
query = query.filter(api_models.Aggregate.name == values['name'])
aggregate = query.first()
if not aggregate:
aggregate = api_models.Aggregate()
aggregate.update(values)
aggregate.save(context.session)
# We don't want these to be lazy loaded later. We know there is
# nothing here since we just created this aggregate.
aggregate._hosts = []
aggregate._metadata = []
else:
raise exception.AggregateNameExists(aggregate_name=values['name'])
if metadata:
_metadata_add_to_db(context, aggregate.id, metadata)
context.session.expire(aggregate, ['_metadata'])
aggregate._metadata
return aggregate
@db_api.api_context_manager.writer
def _aggregate_delete_from_db(context, aggregate_id):
# Delete Metadata first
context.session.query(api_models.AggregateMetadata).\
filter_by(aggregate_id=aggregate_id).\
delete()
count = context.session.query(api_models.Aggregate).\
filter(api_models.Aggregate.id == aggregate_id).\
delete()
if count == 0:
raise exception.AggregateNotFound(aggregate_id=aggregate_id)
@db_api.api_context_manager.writer
def _aggregate_update_to_db(context, aggregate_id, values):
aggregate = _aggregate_get_from_db(context, aggregate_id)
@ -285,11 +325,31 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
db_aggregate = db.aggregate_get_by_uuid(context, aggregate_uuid)
return cls._from_db_object(context, cls(), db_aggregate)
@staticmethod
@db_api.main_context_manager.reader
def _ensure_migrated(context):
result = context.session.query(main_models.Aggregate).\
filter_by(deleted=0).count()
if result:
LOG.warning(
_LW('Main database contains %(count)i unmigrated aggregates'),
{'count': result})
return result == 0
@base.remotable
def create(self):
if self.obj_attr_is_set('id'):
raise exception.ObjectActionError(action='create',
reason='already created')
# NOTE(mdoff): Once we have made it past a point where we know
# all aggregates have been migrated, we can remove this. Ideally
# in Ocata with a blocker migration to be sure.
if not self._ensure_migrated(self._context):
raise exception.ObjectActionError(
action='create',
reason='main database still contains aggregates')
self._assert_no_hosts('create')
updates = self.obj_get_changes()
payload = dict(updates)
@ -304,8 +364,8 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
"create.start",
payload)
metadata = updates.pop('metadata', None)
db_aggregate = db.aggregate_create(self._context, updates,
metadata=metadata)
db_aggregate = _aggregate_create_in_db(self._context, updates,
metadata=metadata)
self._from_db_object(self._context, self, db_aggregate)
payload['aggregate_id'] = self.id
compute_utils.notify_about_aggregate_update(self._context,
@ -372,7 +432,10 @@ class Aggregate(base.NovaPersistentObject, base.NovaObject):
@base.remotable
def destroy(self):
db.aggregate_delete(self._context, self.id)
try:
_aggregate_delete_from_db(self._context, self.id)
except exception.AggregateNotFound:
db.aggregate_delete(self._context, self.id)
@base.remotable
def add_host(self, host):

View File

@ -23,6 +23,7 @@ from nova import exception
from nova import test
from nova.tests import fixtures
from nova.tests.unit import matchers
from nova.tests.unit.objects.test_objects import compare_obj as base_compare
from nova.tests import uuidsentinel
import nova.objects.aggregate as aggregate_obj
@ -205,6 +206,65 @@ class AggregateObjectDbTestCase(test.NoDBTestCase):
key='goodkey')
self.assertEqual(2, len(rl1))
def test_aggregate_create_in_db(self):
fake_create_aggregate = {
'name': 'fake-aggregate',
}
agg = aggregate_obj._aggregate_create_in_db(self.context,
fake_create_aggregate)
result = aggregate_obj._aggregate_get_from_db(self.context,
agg.id)
self.assertEqual(result.name, fake_create_aggregate['name'])
def test_aggregate_create_in_db_with_metadata(self):
fake_create_aggregate = {
'name': 'fake-aggregate',
}
agg = aggregate_obj._aggregate_create_in_db(self.context,
fake_create_aggregate,
metadata={'goodkey': 'good'})
result = aggregate_obj._aggregate_get_from_db(self.context,
agg.id)
md = aggregate_obj._get_by_metadata_key_from_db(self.context,
key='goodkey')
self.assertEqual(len(md), 1)
self.assertEqual(md[0]['id'], agg.id)
self.assertEqual(result.name, fake_create_aggregate['name'])
def test_aggregate_create_raise_exist_exc(self):
fake_create_aggregate = {
'name': 'fake-aggregate',
}
aggregate_obj._aggregate_create_in_db(self.context,
fake_create_aggregate)
self.assertRaises(exception.AggregateNameExists,
aggregate_obj._aggregate_create_in_db,
self.context,
fake_create_aggregate,
metadata=None)
def test_aggregate_delete(self):
result = _create_aggregate(self.context, metadata=None)
aggregate_obj._aggregate_delete_from_db(self.context, result['id'])
self.assertRaises(exception.AggregateNotFound,
aggregate_obj._aggregate_get_from_db,
self.context, result['id'])
def test_aggregate_delete_raise_not_found(self):
# this does not exist!
aggregate_id = 45
self.assertRaises(exception.AggregateNotFound,
aggregate_obj._aggregate_delete_from_db,
self.context, aggregate_id)
def test_aggregate_delete_with_metadata(self):
result = _create_aggregate(self.context,
metadata={'availability_zone': 'fake_avail_zone'})
aggregate_obj._aggregate_delete_from_db(self.context, result['id'])
self.assertRaises(exception.AggregateNotFound,
aggregate_obj._aggregate_get_from_db,
self.context, result['id'])
def test_aggregate_update(self):
created = _create_aggregate(self.context,
metadata={'availability_zone': 'fake_avail_zone'})
@ -408,3 +468,128 @@ class AggregateObjectDbTestCase(test.NoDBTestCase):
self.assertRaises(exception.AggregateMetadataNotFound,
aggregate_obj._metadata_delete_from_db,
self.context, result['id'], 'foo_key')
def create_aggregate(context, db_id, in_api=True):
if in_api:
fake_aggregate = _get_fake_aggregate(db_id, in_api=False, result=False)
aggregate_obj._aggregate_create_in_db(context, fake_aggregate,
metadata=_get_fake_metadata(db_id))
for host in _get_fake_hosts(db_id):
aggregate_obj._host_add_to_db(context, fake_aggregate['id'], host)
else:
fake_aggregate = _get_fake_aggregate(db_id, in_api=False, result=False)
db.aggregate_create(context, fake_aggregate,
metadata=_get_fake_metadata(db_id))
for host in _get_fake_hosts(db_id):
db.aggregate_host_add(context, fake_aggregate['id'], host)
def compare_obj(test, result, source):
source['deleted'] = False
def updated_at_comparator(result, source):
return True
return base_compare(test, result, source, subs=SUBS,
comparators={'updated_at': updated_at_comparator})
class AggregateObjectCellTestCase(test.NoDBTestCase):
"""Tests for the case where all aggregate data is in Cell DB"""
USES_DB_SELF = True
def setUp(self):
super(AggregateObjectCellTestCase, self).setUp()
self.context = context.RequestContext('fake-user', 'fake-project')
self.useFixture(fixtures.Database())
self.useFixture(fixtures.Database(database='api'))
self._seed_data()
def _seed_data(self):
for i in range(1, 10):
create_aggregate(self.context, i, in_api=False)
def test_get_by_id(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, _get_fake_aggregate(i))
def test_save(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
fake_agg = _get_fake_aggregate(i)
fake_agg['name'] = 'new-name' + str(i)
agg.name = 'new-name' + str(i)
agg.save()
result = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, fake_agg)
compare_obj(self, result, fake_agg)
def test_update_metadata(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
fake_agg = _get_fake_aggregate(i)
fake_agg['metadetails'] = {'constant_key': 'constant_value'}
agg.update_metadata({'unique_key': None})
agg.save()
result = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, fake_agg)
compare_obj(self, result, fake_agg)
def test_destroy(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
agg.destroy()
aggs = aggregate_obj.AggregateList.get_all(self.context)
self.assertEqual(len(aggs), 0)
def test_add_host(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
fake_agg = _get_fake_aggregate(i)
fake_agg['hosts'].append('barbar')
agg.add_host('barbar')
agg.save()
result = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, fake_agg)
compare_obj(self, result, fake_agg)
def test_delete_host(self):
for i in range(1, 10):
agg = aggregate_obj.Aggregate.get_by_id(self.context, i)
fake_agg = _get_fake_aggregate(i)
fake_agg['hosts'].remove('constant_host')
agg.delete_host('constant_host')
result = aggregate_obj.Aggregate.get_by_id(self.context, i)
compare_obj(self, agg, fake_agg)
compare_obj(self, result, fake_agg)
class AggregateObjectApiTestCase(AggregateObjectCellTestCase):
"""Tests the aggregate in the case where all data is in the API DB"""
def _seed_data(self):
for i in range(1, 10):
create_aggregate(self.context, i)
def test_create(self):
new_agg = aggregate_obj.Aggregate(self.context)
new_agg.name = 'new-aggregate'
new_agg.create()
result = aggregate_obj.Aggregate.get_by_id(self.context, new_agg.id)
self.assertEqual(new_agg.name, result.name)
class AggregateObjectMixedTestCase(AggregateObjectCellTestCase):
"""Tests the aggregate in the case where data is in both databases"""
def _seed_data(self):
for i in range(1, 6):
create_aggregate(self.context, i)
for i in range(6, 10):
create_aggregate(self.context, i, in_api=False)
def test_create(self):
new_agg = aggregate_obj.Aggregate(self.context)
new_agg.name = 'new-aggregate'
self.assertRaises(exception.ObjectActionError,
new_agg.create)

View File

@ -154,6 +154,17 @@ class AggregateTestCaseV21(test.NoDBTestCase):
{"name": "test",
"availability_zone": "nova1"}})
@mock.patch.object(compute_api.AggregateAPI, 'create_aggregate')
def test_create_with_unmigrated_aggregates(self, mock_create_aggregate):
mock_create_aggregate.side_effect = \
exception.ObjectActionError(action='create',
reason='main database still contains aggregates')
self.assertRaises(exc.HTTPConflict, self.controller.create,
self.req, body={"aggregate":
{"name": "test",
"availability_zone": "nova1"}})
def test_create_with_incorrect_availability_zone(self):
def stub_create_aggregate(context, name, availability_zone):
raise exception.InvalidAggregateAction(action='create_aggregate',

View File

@ -10260,8 +10260,6 @@ class ComputeAPIAggrTestCase(BaseTestCase):
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
None)
self.api.delete_aggregate(self.context, aggr.id)
db.aggregate_get(self.context.elevated(read_deleted='yes'),
aggr.id)
self.assertRaises(exception.AggregateNotFound,
self.api.delete_aggregate, self.context, aggr.id)
@ -10544,8 +10542,6 @@ class ComputeAPIAggrTestCase(BaseTestCase):
msg = fake_notifier.NOTIFICATIONS[1]
self.assertEqual(msg.event_type,
'aggregate.delete.end')
db.aggregate_get(self.context.elevated(read_deleted='yes'),
aggr.id)
self.assertRaises(exception.AggregateNotFound,
self.api.delete_aggregate, self.context, aggr.id)

View File

@ -261,8 +261,7 @@ class TestNewtonCheck(test.TestCase):
self.migration.upgrade, self.engine)
def test_aggregate_not_migrated(self):
agg = objects.Aggregate(context=self.context, name='foo')
agg.create()
agg = db_api.aggregate_create(self.context, {"name": "foobar"})
db_api.aggregate_update(self.context, agg.id, {'uuid': None})
self.assertRaises(exception.ValidationError,
self.migration.upgrade, self.engine)

View File

@ -110,25 +110,30 @@ class _TestAggregateObject(object):
self.assertEqual(fake_aggregate['id'], agg.id)
self.assertFalse(get_by_uuid.called)
@mock.patch.object(db, 'aggregate_create')
def test_create(self, mock_aggregate_create):
mock_aggregate_create.return_value = fake_aggregate
@mock.patch('nova.objects.aggregate._aggregate_create_in_db')
@mock.patch('nova.db.aggregate_create')
def test_create(self, create_mock, api_create_mock):
api_create_mock.return_value = fake_aggregate
agg = aggregate.Aggregate(context=self.context)
agg.name = 'foo'
agg.metadata = {'one': 'two'}
agg.uuid = uuidsentinel.fake_agg
agg.create()
api_create_mock.assert_called_once_with(
self.context,
{'name': 'foo', 'uuid': uuidsentinel.fake_agg},
metadata={'one': 'two'})
self.assertFalse(create_mock.called)
self.compare_obj(agg, fake_aggregate, subs=SUBS)
mock_aggregate_create.assert_called_once_with(self.context,
api_create_mock.assert_called_once_with(self.context,
{'name': 'foo', 'uuid': uuidsentinel.fake_agg},
metadata={'one': 'two'})
@mock.patch('nova.objects.aggregate._aggregate_create_in_db')
@mock.patch.object(db, 'aggregate_create')
def test_recreate_fails(self, mock_aggregate_create):
mock_aggregate_create.return_value = fake_aggregate
def test_recreate_fails(self, create_mock, api_create_mock):
api_create_mock.return_value = fake_aggregate
agg = aggregate.Aggregate(context=self.context)
agg.name = 'foo'
agg.metadata = {'one': 'two'}
@ -136,10 +141,30 @@ class _TestAggregateObject(object):
agg.create()
self.assertRaises(exception.ObjectActionError, agg.create)
mock_aggregate_create.assert_called_once_with(self.context,
api_create_mock.assert_called_once_with(self.context,
{'name': 'foo', 'uuid': uuidsentinel.fake_agg},
metadata={'one': 'two'})
@mock.patch('nova.objects.aggregate._aggregate_delete_from_db')
@mock.patch('nova.db.aggregate_delete')
def test_destroy(self, delete_mock, api_delete_mock):
agg = aggregate.Aggregate(context=self.context)
agg.id = 123
agg.destroy()
self.assertFalse(delete_mock.called)
api_delete_mock.assert_called_with(self.context, 123)
@mock.patch('nova.objects.aggregate._aggregate_delete_from_db')
@mock.patch('nova.db.aggregate_delete')
def test_destroy_cell(self, delete_mock, api_delete_mock):
api_delete_mock.side_effect = exception.AggregateNotFound(
aggregate_id=123)
agg = aggregate.Aggregate(context=self.context)
agg.id = 123
agg.destroy()
delete_mock.assert_called_with(self.context, 123)
api_delete_mock.assert_called_with(self.context, 123)
@mock.patch('nova.objects.aggregate._aggregate_update_to_db')
@mock.patch('nova.db.aggregate_update')
def test_save(self, update_mock, api_update_mock):
@ -259,14 +284,6 @@ class _TestAggregateObject(object):
123,
{'toadd': 'myval'})
@mock.patch.object(db, 'aggregate_delete')
def test_destroy(self, mock_aggregate_delete):
agg = aggregate.Aggregate(context=self.context)
agg.id = 123
agg.destroy()
mock_aggregate_delete.assert_called_once_with(self.context, 123)
@mock.patch.object(db, 'aggregate_host_add')
def test_add_host(self, mock_host_add):
mock_host_add.return_value = {'host': 'bar'}

View File

@ -3032,10 +3032,10 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
aggregate = self._aggregate_setup()
self.conn._pool.add_to_aggregate(self.context, aggregate, "host")
result = db.aggregate_get(self.context, aggregate.id)
result = objects.Aggregate.get_by_id(self.context, aggregate.id)
self.assertTrue(fake_init_pool.called)
self.assertThat(self.fake_metadata,
matchers.DictMatches(result['metadetails']))
matchers.DictMatches(result.metadata))
def test_join_slave(self):
# Ensure join_slave gets called when the request gets to master.
@ -3111,12 +3111,12 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
aggregate = self._aggregate_setup(metadata=self.fake_metadata)
self.conn._pool.remove_from_aggregate(self.context, aggregate, "host")
result = db.aggregate_get(self.context, aggregate.id)
result = objects.Aggregate.get_by_id(self.context, aggregate.id)
self.assertTrue(fake_clear_pool.called)
self.assertThat({'availability_zone': 'fake_zone',
pool_states.POOL_FLAG: 'XenAPI',
pool_states.KEY: pool_states.ACTIVE},
matchers.DictMatches(result['metadetails']))
matchers.DictMatches(result.metadata))
def test_remote_master_non_empty_pool(self):
# Ensure AggregateError is raised if removing the master.
@ -3183,7 +3183,9 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
# let's mock the fact that the aggregate is ready!
metadata = {pool_states.POOL_FLAG: "XenAPI",
pool_states.KEY: pool_states.ACTIVE}
db.aggregate_metadata_add(self.context, aggr.id, metadata)
self.api.update_aggregate_metadata(self.context,
aggr.id,
metadata)
for aggregate_host in values[fake_zone]:
aggr = self.api.add_host_to_aggregate(self.context,
aggr.id, aggregate_host)

View File

@ -0,0 +1,14 @@
---
upgrade:
- Aggregates are being moved to the API database for CellsV2. In this
release, the online data migrations will move any aggregates you have
in your main database to the API database, retaining all
attributes. Until this is complete, new attempts to create aggregates
will return an HTTP 409 to avoid creating aggregates in one place that
may conflict with aggregates you already have and are yet to be
migrated.
- Note that aggregates can no longer be soft-deleted as the API
database does not replicate the legacy soft-delete functionality
from the main database. As such, deleted aggregates are not migrated
and the behavior users will experience will be the same as if a
purge of deleted records was performed.