Merge "placement: use single-shot INSERT/DELETE agg"
This commit is contained in:
commit
6ce16e329a
|
@ -416,12 +416,16 @@ def _get_provider_by_uuid(context, uuid):
|
||||||
|
|
||||||
@db_api.placement_context_manager.reader
|
@db_api.placement_context_manager.reader
|
||||||
def _get_aggregates_by_provider_id(context, rp_id):
|
def _get_aggregates_by_provider_id(context, rp_id):
|
||||||
|
"""Returns a dict, keyed by internal aggregate ID, of aggregate UUIDs
|
||||||
|
associated with the supplied internal resource provider ID.
|
||||||
|
"""
|
||||||
join_statement = sa.join(
|
join_statement = sa.join(
|
||||||
_AGG_TBL, _RP_AGG_TBL, sa.and_(
|
_AGG_TBL, _RP_AGG_TBL, sa.and_(
|
||||||
_AGG_TBL.c.id == _RP_AGG_TBL.c.aggregate_id,
|
_AGG_TBL.c.id == _RP_AGG_TBL.c.aggregate_id,
|
||||||
_RP_AGG_TBL.c.resource_provider_id == rp_id))
|
_RP_AGG_TBL.c.resource_provider_id == rp_id))
|
||||||
sel = sa.select([_AGG_TBL.c.uuid]).select_from(join_statement)
|
sel = sa.select([_AGG_TBL.c.id, _AGG_TBL.c.uuid]).select_from(
|
||||||
return [r[0] for r in context.session.execute(sel).fetchall()]
|
join_statement)
|
||||||
|
return {r[0]: r[1] for r in context.session.execute(sel).fetchall()}
|
||||||
|
|
||||||
|
|
||||||
@db_api.placement_context_manager.reader
|
@db_api.placement_context_manager.reader
|
||||||
|
@ -484,6 +488,36 @@ def _anchors_for_sharing_providers(context, rp_ids, get_id=False):
|
||||||
return set([(r[0], r[1]) for r in context.session.execute(sel).fetchall()])
|
return set([(r[0], r[1]) for r in context.session.execute(sel).fetchall()])
|
||||||
|
|
||||||
|
|
||||||
|
@db_api.placement_context_manager.independent.writer
|
||||||
|
def _ensure_aggregate(ctx, agg_uuid):
|
||||||
|
"""Finds an aggregate and returns its internal ID. If not found, creates
|
||||||
|
the aggregate and returns the new aggregate's internal ID.
|
||||||
|
"""
|
||||||
|
sel = sa.select([_AGG_TBL.c.id]).where(_AGG_TBL.c.uuid == agg_uuid)
|
||||||
|
res = ctx.session.execute(sel).fetchone()
|
||||||
|
if res:
|
||||||
|
return res[0]
|
||||||
|
|
||||||
|
LOG.debug("_ensure_aggregate() did not find aggregate %s. "
|
||||||
|
"Creating it.", agg_uuid)
|
||||||
|
try:
|
||||||
|
ins_stmt = _AGG_TBL.insert().values(uuid=agg_uuid)
|
||||||
|
res = ctx.session.execute(ins_stmt)
|
||||||
|
agg_id = res.inserted_primary_key[0]
|
||||||
|
LOG.debug("_ensure_aggregate() created new aggregate %s (id=%d).",
|
||||||
|
agg_uuid, agg_id)
|
||||||
|
return agg_id
|
||||||
|
except db_exc.DBDuplicateEntry:
|
||||||
|
# Something else added this agg_uuid in between our initial
|
||||||
|
# fetch above and when we tried flushing this session, so let's
|
||||||
|
# grab whatever that other thing added.
|
||||||
|
LOG.debug("_ensure_provider() failed to create new aggregate %s. "
|
||||||
|
"Another thread already created an aggregate record. "
|
||||||
|
"Looking up that aggregate record.",
|
||||||
|
agg_uuid)
|
||||||
|
return _ensure_aggregate(ctx, agg_uuid)
|
||||||
|
|
||||||
|
|
||||||
@db_api.placement_context_manager.writer
|
@db_api.placement_context_manager.writer
|
||||||
def _set_aggregates(context, resource_provider, provided_aggregates,
|
def _set_aggregates(context, resource_provider, provided_aggregates,
|
||||||
increment_generation=False):
|
increment_generation=False):
|
||||||
|
@ -498,9 +532,17 @@ def _set_aggregates(context, resource_provider, provided_aggregates,
|
||||||
# to avoid bloat if it turns out we're creating a lot of noise.
|
# to avoid bloat if it turns out we're creating a lot of noise.
|
||||||
# Not doing now to move things along.
|
# Not doing now to move things along.
|
||||||
provided_aggregates = set(provided_aggregates)
|
provided_aggregates = set(provided_aggregates)
|
||||||
existing_aggregates = set(_get_aggregates_by_provider_id(context, rp_id))
|
existing_aggregates = _get_aggregates_by_provider_id(context, rp_id)
|
||||||
to_add = provided_aggregates - existing_aggregates
|
agg_uuids_to_add = provided_aggregates - set(existing_aggregates.values())
|
||||||
target_aggregates = list(provided_aggregates)
|
# A dict, keyed by internal aggregate ID, of aggregate UUIDs that will be
|
||||||
|
# associated with the provider
|
||||||
|
aggs_to_associate = {}
|
||||||
|
# Same dict for those aggregates to remove the association with this
|
||||||
|
# provider
|
||||||
|
aggs_to_disassociate = {
|
||||||
|
agg_id: agg_uuid for agg_id, agg_uuid in existing_aggregates.items()
|
||||||
|
if agg_uuid not in provided_aggregates
|
||||||
|
}
|
||||||
|
|
||||||
# Create any aggregates that do not yet exist in
|
# Create any aggregates that do not yet exist in
|
||||||
# PlacementAggregates. This is different from
|
# PlacementAggregates. This is different from
|
||||||
|
@ -511,38 +553,32 @@ def _set_aggregates(context, resource_provider, provided_aggregates,
|
||||||
# create a new row in the PlacementAggregate table if the
|
# create a new row in the PlacementAggregate table if the
|
||||||
# aggregate uuid has never been seen before. Code further
|
# aggregate uuid has never been seen before. Code further
|
||||||
# below will update the associations.
|
# below will update the associations.
|
||||||
for agg_uuid in to_add:
|
for agg_uuid in agg_uuids_to_add:
|
||||||
found_agg = context.session.query(models.PlacementAggregate.uuid).\
|
agg_id = _ensure_aggregate(context, agg_uuid)
|
||||||
filter_by(uuid=agg_uuid).first()
|
aggs_to_associate[agg_id] = agg_uuid
|
||||||
if not found_agg:
|
|
||||||
new_aggregate = models.PlacementAggregate(uuid=agg_uuid)
|
|
||||||
try:
|
|
||||||
context.session.add(new_aggregate)
|
|
||||||
# Flush each aggregate to explicitly call the INSERT
|
|
||||||
# statement that could result in an integrity error
|
|
||||||
# if some other thread has added this agg_uuid. This
|
|
||||||
# also makes sure that the new aggregates have
|
|
||||||
# ids when the SELECT below happens.
|
|
||||||
context.session.flush()
|
|
||||||
except db_exc.DBDuplicateEntry:
|
|
||||||
# Something else has already added this agg_uuid
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Remove all aggregate associations so we can refresh them
|
for agg_id, agg_uuid in aggs_to_associate.items():
|
||||||
# below. This means that all associations are added, but the
|
try:
|
||||||
# aggregates themselves stay around.
|
ins_stmt = _RP_AGG_TBL.insert().values(
|
||||||
context.session.query(models.ResourceProviderAggregate).filter_by(
|
resource_provider_id=rp_id, aggregate_id=agg_id)
|
||||||
resource_provider_id=rp_id).delete()
|
context.session.execute(ins_stmt)
|
||||||
|
LOG.debug("Setting aggregates for provider %s. Successfully "
|
||||||
|
"associated aggregate %s.",
|
||||||
|
resource_provider.uuid, agg_uuid)
|
||||||
|
except db_exc.DBDuplicateEntry:
|
||||||
|
LOG.debug("Setting aggregates for provider %s. Another thread "
|
||||||
|
"already associated aggregate %s. Skipping.",
|
||||||
|
resource_provider.uuid, agg_uuid)
|
||||||
|
pass
|
||||||
|
|
||||||
# Set resource_provider_id, aggregate_id pairs to
|
for agg_id, agg_uuid in aggs_to_disassociate.items():
|
||||||
# ResourceProviderAggregate table.
|
del_stmt = _RP_AGG_TBL.delete().where(sa.and_(
|
||||||
if target_aggregates:
|
_RP_AGG_TBL.c.resource_provider_id == rp_id,
|
||||||
select_agg_id = sa.select([rp_id, models.PlacementAggregate.id]).\
|
_RP_AGG_TBL.c.aggregate_id == agg_id))
|
||||||
where(models.PlacementAggregate.uuid.in_(target_aggregates))
|
context.session.execute(del_stmt)
|
||||||
insert_aggregates = models.ResourceProviderAggregate.__table__.\
|
LOG.debug("Setting aggregates for provider %s. Successfully "
|
||||||
insert().from_select(['resource_provider_id', 'aggregate_id'],
|
"disassociated aggregate %s.",
|
||||||
select_agg_id)
|
resource_provider.uuid, agg_uuid)
|
||||||
context.session.execute(insert_aggregates)
|
|
||||||
|
|
||||||
if increment_generation:
|
if increment_generation:
|
||||||
resource_provider.generation = _increment_provider_generation(
|
resource_provider.generation = _increment_provider_generation(
|
||||||
|
@ -939,7 +975,8 @@ class ResourceProvider(base.VersionedObject, base.TimestampedObject):
|
||||||
|
|
||||||
def get_aggregates(self):
|
def get_aggregates(self):
|
||||||
"""Get the aggregate uuids associated with this resource provider."""
|
"""Get the aggregate uuids associated with this resource provider."""
|
||||||
return _get_aggregates_by_provider_id(self._context, self.id)
|
return list(
|
||||||
|
_get_aggregates_by_provider_id(self._context, self.id).values())
|
||||||
|
|
||||||
def set_aggregates(self, aggregate_uuids, increment_generation=False):
|
def set_aggregates(self, aggregate_uuids, increment_generation=False):
|
||||||
"""Set the aggregate uuids associated with this resource provider.
|
"""Set the aggregate uuids associated with this resource provider.
|
||||||
|
|
Loading…
Reference in New Issue