Adds aggregates DB model and API

This addes aggregates db model with migrations and the related
API.

Partially Implements: bp node-aggregate

Change-Id: I019a9a4a5483750dc42a8cbf714a9d194d37fae8
This commit is contained in:
Zhenguo Niu 2017-07-12 11:41:17 +08:00
parent 4de16ca525
commit 9868e1a031
5 changed files with 278 additions and 0 deletions

View File

@ -194,6 +194,19 @@ class ComputePortNotAvailable(NotFound):
_msg_fmt = _("No available compute ports.")
class AggregateNameExists(Conflict):
_msg_fmt = _("Aggregate %(name)s already exists.")
class AggregateNotFound(NotFound):
_msg_fmt = _("Aggregate %(aggregate)s could not be found.")
class AggregateMetadataNotFound(NotFound):
_msg_fmt = _("Aggregate %(aggregate_id)s metadata with key %(key)s "
"could not be found.")
class NodeNotFound(NotFound):
_msg_fmt = _("Node associated with server %(server)s "
"could not be found.")

View File

@ -196,3 +196,50 @@ class Connection(object):
def key_pair_count_by_user(self, context, user_id):
"""Count number of key pairs for the given user ID."""
return IMPL.key_pair_count_by_user(context, user_id)
@abc.abstractmethod
def aggregate_create(self, context, values):
"""Create an aggregate from the values dictionary."""
return IMPL.aggregate_create(context, values)
@abc.abstractmethod
def aggregate_update(self, context, aggregate_id, values):
"""Update an aggregate from the values dictionary."""
return IMPL.aggregate_update(context, aggregate_id, values)
@abc.abstractmethod
def aggregate_get(self, context, aggregate_id):
"""Get an aggregate or raise if it does not exist."""
return IMPL.aggregate_get(context, aggregate_id)
@abc.abstractmethod
def aggregate_get_all(self, context):
"""Get all aggregates."""
return IMPL.aggregate_get_all(context)
@abc.abstractmethod
def aggregate_destroy(self, context, aggregate_id):
"""Destroy the aggregate or raise if it does not exist."""
return IMPL.aggregate_destroy(context, aggregate_id)
@abc.abstractmethod
def aggregate_get_by_metadata_key(self, context, key):
"""Get a list of aggregates by metadata key."""
return IMPL.aggregate_get_by_metadata_key(context, key)
@abc.abstractmethod
def aggregate_metadata_update_or_create(self, context, aggregate_id,
metadata):
"""Update/Create aggregates metadata."""
return IMPL.aggregate_metadata_update_or_create(context, aggregate_id,
metadata)
@abc.abstractmethod
def aggregate_metadata_get(self, context, aggregate_id):
"""Get aggregate metadata by aggregate id."""
return IMPL.aggregate_metadata_get(context, aggregate_id)
@abc.abstractmethod
def aggregate_metadata_delete(self, context, key):
"""Delete aggregate metadata by key."""
return IMPL.aggregate_metadata_delete(context, key)

View File

@ -189,3 +189,30 @@ def upgrade():
mysql_engine='InnoDB',
mysql_charset='utf8'
)
op.create_table(
'aggregates',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), primary_key=True, nullable=False),
sa.Column('uuid', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('id'),
mysql_ENGINE='InnoDB',
mysql_DEFAULT_CHARSET='UTF8'
)
op.create_table(
'aggregate_metadata',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.Integer(), primary_key=True, nullable=False),
sa.Column('key', sa.String(length=255), nullable=False),
sa.Column('value', sa.String(length=255), nullable=False),
sa.Column('aggregate_id', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['aggregate_id'],
['aggregates.id']),
mysql_ENGINE='InnoDB',
mysql_DEFAULT_CHARSET='UTF8'
)
op.create_index('aggregate_metadata_key_idx', 'aggregate_metadata',
['key'], unique=False)

View File

@ -26,7 +26,9 @@ from oslo_utils import strutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
from sqlalchemy import or_
from sqlalchemy.orm import contains_eager
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import joinedload
from sqlalchemy.sql.expression import desc
from sqlalchemy.sql import true
@ -736,6 +738,153 @@ class Connection(api.Connection):
return model_query(context, models.KeyPair).filter_by(
user_id=user_id).count()
@oslo_db_api.retry_on_deadlock
def aggregate_create(self, context, values):
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
metadata = values.pop('metadata', None)
with _session_for_write() as session:
query = model_query(context, models.Aggregate).filter_by(
name=values['name']).options(joinedload("_metadata"))
aggregate = query.first()
if not aggregate:
aggregate = models.Aggregate()
aggregate.update(values)
aggregate.save(session=session)
# We don't want these to be lazy loaded later. We know there
# is nothing here since we just created this aggregate.
aggregate._metadata = []
else:
raise exception.AggregateNameExists(name=values['name'])
if metadata:
self.aggregate_metadata_update_or_create(
context, aggregate.id, metadata)
# NOTE(pkholkin): '_metadata' attribute was updated during
# 'aggregate_metadata_create_or_update' method, so it should
# be expired and read from db
session.expire(aggregate, ['_metadata'])
aggregate._metadata
return aggregate
def aggregate_get(self, context, aggregate_id):
query = model_query(context, models.Aggregate).filter_by(
uuid=aggregate_id).options(joinedload("_metadata"))
try:
result = query.one()
except NoResultFound:
raise exception.AggregateNotFound(aggregate=aggregate_id)
return result
def aggregate_update(self, context, aggregate_id, values):
if 'uuid' in values:
msg = _("Cannot overwrite UUID for an existing aggregate.")
raise exception.InvalidParameterValue(err=msg)
try:
result = self._do_update_aggregate(context, aggregate_id, values)
except db_exc.DBDuplicateEntry as e:
if 'name' in e.columns:
raise exception.DuplicateName(name=values['name'])
return result
@oslo_db_api.retry_on_deadlock
def _do_update_aggregate(self, context, aggregate_id, values):
with _session_for_write():
query = model_query(context, models.Aggregate)
query = add_identity_filter(query, aggregate_id)
try:
ref = query.with_lockmode('update').one()
except NoResultFound:
raise exception.AggregateNotFound(aggregate=aggregate_id)
ref.update(values)
return ref
def aggregate_get_all(self, context):
aggregates = model_query(context, models.Aggregate). \
options(joinedload("_metadata")).all()
return aggregates
@oslo_db_api.retry_on_deadlock
def aggregate_destroy(self, context, aggregate_id):
with _session_for_write():
# First clean up all metadata related to this type
meta_query = model_query(
context,
models.AggregateMetadata).filter_by(
aggregate_id=aggregate_id)
meta_query.delete()
query = model_query(
context,
models.Aggregate).filter_by(id=aggregate_id)
count = query.delete()
if count != 1:
raise exception.AggregateNotFound(aggregate=aggregate_id)
def aggregate_get_by_metadata_key(self, context, key):
query = model_query(context, models.Aggregate)
query = query.join("_metadata")
query = query.filter(models.AggregateMetadata.key == key)
query = query.options(contains_eager("_metadata"))
return query.all()
@oslo_db_api.retry_on_deadlock
def aggregate_metadata_update_or_create(self, context, aggregate_id,
metadata, max_retries=10):
for attempt in range(max_retries):
try:
query = model_query(
context, models.AggregateMetadata).\
filter_by(aggregate_id=aggregate_id).\
filter(models.AggregateMetadata.key.in_(
metadata.keys())).with_lockmode('update').all()
already_existing_keys = set()
for meta_ref in query:
key = meta_ref["key"]
meta_ref.update({"value": metadata[key]})
already_existing_keys.add(key)
for key, value in metadata.items():
if key in already_existing_keys:
continue
metadata_ref = models.AggregateMetadata()
metadata_ref.update({"key": key,
"value": value,
"aggregate_id": aggregate_id})
with _session_for_write() as session:
session.add(metadata_ref)
session.flush()
return metadata
except db_exc.DBDuplicateEntry:
# a concurrent transaction has been committed,
# try again unless this was the last attempt
if attempt < max_retries - 1:
LOG.warning("Add metadata failed for aggregate %(id)s "
"after %(retries)s retries",
{"id": aggregate_id, "retries": max_retries})
def aggregate_metadata_get(self, context, aggregate_id):
rows = model_query(context, models.AggregateMetadata). \
filter_by(aggregate_id=aggregate_id).all()
return {row["key"]: row["value"] for row in rows}
@oslo_db_api.retry_on_deadlock
def aggregate_metadata_delete(self, context, aggregate_id, key):
with _session_for_write():
result = model_query(context, models.AggregateMetadata). \
filter_by(aggregate_id=aggregate_id). \
filter(models.AggregateMetadata.key == key). \
delete(synchronize_session=False)
# did not find the metadata
if result == 0:
raise exception.AggregateMetadataNotFound(
key=key, aggregate_id=aggregate_id)
def _get_id_from_flavor_query(context, type_id):
return model_query(context, models.Flavors). \

View File

@ -305,3 +305,45 @@ class KeyPair(Base):
public_key = Column(Text())
type = Column(Enum('ssh', 'x509', name='keypair_types'),
nullable=False, server_default='ssh')
class AggregateMetadata(Base):
"""Represents possible types for aggregate metadata."""
__tablename__ = 'aggregate_metadata'
__table_args__ = (
schema.UniqueConstraint(
'aggregate_id', 'key',
name='uniq_aggregate_metadata0aggregate_id0key'),
Index('aggregate_metadata_key_idx', 'key'),
table_args()
)
id = Column(Integer, primary_key=True)
key = Column(String(255), nullable=False)
value = Column(String(255), nullable=False)
aggregate_id = Column(Integer, ForeignKey('aggregates.id'), nullable=False)
class Aggregate(Base):
"""Represents possible types for aggregates."""
__tablename__ = 'aggregates'
__table_args__ = (
Index('aggregate_uuid_idx', 'uuid'),
table_args()
)
id = Column(Integer, primary_key=True)
uuid = Column(String(36), nullable=False)
name = Column(String(255), nullable=False)
_metadata = orm.relationship(
AggregateMetadata,
primaryjoin='and_('
'Aggregate.id == AggregateMetadata.aggregate_id)')
@property
def _extra_keys(self):
return ['metadetails']
@property
def metadetails(self):
return {m.key: m.value for m in self._metadata}