Increment revision numbers on object changes

This adds the logic to increment the revision numbers
for objects whenever there are changes and it exposes
the revision number via a field in the API.

This is handled with a new default service plugin that
subscribes to DB events and bumps revision numbers for
any objects that were modified.

It also handles the logic for bumping the revision number
of a parent in a relationship where the children aren't
top-level neutron objects that would be tracked individually.
This is accomplished with a 'revises_on_change' attribute
on the child models that the service plugin will use to
find the parent and bump its revision.

API tests are included to test the revision numbers
added to each standard attribute enabled object.

Partially-Implements: bp/push-notifications
Change-Id: I476d3e03c8ee763cc4be6d679fe9f501eb3a19b5
This commit is contained in:
Kevin Benton 2016-03-24 22:14:58 -07:00
parent f3cde0c31b
commit 4e8cc68349
13 changed files with 416 additions and 4 deletions

View File

@ -74,6 +74,7 @@ class RouterPort(model_base.BASEV2):
sa.String(36),
sa.ForeignKey('ports.id', ondelete="CASCADE"),
primary_key=True)
revises_on_change = ('router', )
# The port_type attribute is redundant as the port table already specifies
# it in DEVICE_OWNER.However, this redundancy enables more efficient
# queries on router ports, and also prevents potential error-prone
@ -1243,10 +1244,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
self._update_fip_assoc(context, fip, floatingip_db,
self._core_plugin.get_port(
context.elevated(), fip_port_id))
floatingip_dict = self._make_floatingip_dict(floatingip_db)
if self._is_dns_integration_supported:
dns_data = self._process_dns_floatingip_update_precommit(
context, floatingip_dict)
context, floatingip_db)
floatingip_dict = self._make_floatingip_dict(floatingip_db)
if self._is_dns_integration_supported:
self._process_dns_floatingip_update_postcommit(context,
floatingip_dict,

View File

@ -176,6 +176,9 @@ class HasStandardAttributes(object):
# standard attr record is being modified, but we must call this
# for all other modifications or when relevant children are being
# modified (e.g. fixed_ips change should bump port revision)
if self.standard_attr.revision_number is None:
# this is a brand new object uncommited so we don't bump now
return
self.standard_attr.revision_number += 1

View File

@ -96,6 +96,7 @@ class IPAllocation(model_base.BASEV2):
network_id = sa.Column(sa.String(36), sa.ForeignKey("networks.id",
ondelete="CASCADE"),
nullable=False, primary_key=True)
revises_on_change = ('port', )
class Route(object):

View File

@ -73,7 +73,7 @@ class SecurityGroupPortBinding(model_base.BASEV2):
security_group_id = sa.Column(sa.String(36),
sa.ForeignKey("securitygroups.id"),
primary_key=True)
revises_on_change = ('ports', )
# Add a relationship to the Port model in order to instruct SQLAlchemy to
# eagerly load security group bindings
ports = orm.relationship(
@ -95,7 +95,7 @@ class SecurityGroupRule(model_base.HasStandardAttributes, model_base.BASEV2,
sa.ForeignKey("securitygroups.id",
ondelete="CASCADE"),
nullable=True)
revises_on_change = ('security_group', )
direction = sa.Column(sa.Enum('ingress', 'egress',
name='securitygrouprules_direction'))
ethertype = sa.Column(sa.String(40))

View File

@ -0,0 +1,53 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api import extensions
REVISION = 'revision'
REVISION_BODY = {
REVISION: {'allow_post': False, 'allow_put': False,
'is_visible': True, 'default': None},
}
RESOURCES = ('security_group_rules', 'security_groups', 'ports', 'subnets',
'networks', 'routers', 'floatingips', 'subnetpools')
EXTENDED_ATTRIBUTES_2_0 = {}
for resource in RESOURCES:
EXTENDED_ATTRIBUTES_2_0[resource] = REVISION_BODY
class Revisions(extensions.ExtensionDescriptor):
"""Extension to expose revision number of standard attr resources."""
@classmethod
def get_name(cls):
return "Resource revision numbers"
@classmethod
def get_alias(cls):
return "revisions"
@classmethod
def get_description(cls):
return ("This extension will display the revision number of neutron "
"resources.")
@classmethod
def get_updated(cls):
return "2016-04-11T10:00:00-00:00"
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
else:
return {}

View File

@ -45,6 +45,7 @@ DEFAULT_SERVICE_PLUGINS = {
'timestamp_core': 'timestamp_core',
'network_ip_availability': 'network-ip-availability',
'flavors': 'flavors',
'revisions': 'revisions',
}
# Service operation status constants

View File

View File

@ -0,0 +1,107 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from sqlalchemy import event
from sqlalchemy.orm import session as se
from neutron._i18n import _, _LW
from neutron.db import db_base_plugin_v2
from neutron.db import model_base
from neutron.extensions import revisions
from neutron.services import service_base
LOG = logging.getLogger(__name__)
class RevisionPlugin(service_base.ServicePluginBase):
"""Plugin to populate revision numbers into standard attr resources."""
supported_extension_aliases = ['revisions']
def __init__(self):
super(RevisionPlugin, self).__init__()
for resource in revisions.RESOURCES:
db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
resource, [self.extend_resource_dict_revision])
event.listen(se.Session, 'before_flush', self.bump_revisions)
def bump_revisions(self, session, context, instances):
# bump revision number for any updated objects in the session
for obj in session.dirty:
if isinstance(obj, model_base.HasStandardAttributes):
obj.bump_revision()
# see if any created/updated/deleted objects bump the revision
# of another object
objects_with_related_revisions = [
o for o in session.deleted | session.dirty | session.new
if getattr(o, 'revises_on_change', ())
]
for obj in objects_with_related_revisions:
self._bump_related_revisions(session, obj)
def _bump_related_revisions(self, session, obj):
for revises_col in getattr(obj, 'revises_on_change', ()):
related_obj = self._find_related_obj(session, obj, revises_col)
if not related_obj:
LOG.warning(_LW("Could not find related %(col)s for resource "
"%(obj)s to bump revision."),
{'obj': obj, 'col': revises_col})
continue
# if related object revises others, bump those as well
self._bump_related_revisions(session, related_obj)
# no need to bump revisions on related objects being deleted
if related_obj not in session.deleted:
related_obj.bump_revision()
def get_plugin_type(self):
return "revision_plugin"
def get_plugin_description(self):
return "Adds revision numbers to resources."
def extend_resource_dict_revision(self, plugin, resource_res, resource_db):
resource_res['revision'] = resource_db.revision_number
def _find_related_obj(self, session, obj, relationship_col):
"""Find a related object for an object based on relationship column.
Given a relationship column, find the object that corresponds to it
either in the current session or by looking it up if it's not present.
"""
# first check to see if it's directly attached to the object already
related_obj = getattr(obj, relationship_col)
if related_obj:
return related_obj
rel = getattr(obj.__class__, relationship_col) # get relationship
local_rel_col = list(rel.property.local_columns)[0]
if len(rel.property.local_columns) > 1:
raise RuntimeError(_("Bumping revisions with composite foreign "
"keys not supported"))
related_model = rel.property.mapper.class_
pk = rel.property.mapper.primary_key[0]
rel_id = getattr(obj, local_rel_col.name)
if not rel_id:
return None
for session_obj in session:
if not isinstance(session_obj, related_model):
continue
if getattr(session_obj, pk.name) == rel_id:
return session_obj
# object isn't in session so we have to query for it
related_obj = (
session.query(related_model).filter(pk == rel_id).
first()
)
return related_obj

View File

@ -26,6 +26,7 @@ NETWORK_API_EXTENSIONS="
qos, \
quotas, \
rbac-policies, \
revisions, \
router, \
router_availability_zone, \
security-group, \

View File

@ -0,0 +1,136 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import test
from neutron.tests.tempest.api import base
from neutron.tests.tempest.api import base_security_groups as bsg
from neutron.tests.tempest import config
class TestRevisions(base.BaseAdminNetworkTest, bsg.BaseSecGroupTest):
@classmethod
@test.requires_ext(extension="revisions", service="network")
def skip_checks(cls):
super(TestRevisions, cls).skip_checks()
@test.idempotent_id('4a26a4be-9c53-483c-bc50-b53f1db10ac6')
def test_update_network_bumps_revision(self):
net = self.create_network()
self.assertIn('revision', net)
updated = self.client.update_network(net['id'], name='newnet')
self.assertGreater(updated['network']['revision'], net['revision'])
@test.idempotent_id('cac7ecde-12d5-4331-9a03-420899dea077')
def test_update_port_bumps_revision(self):
net = self.create_network()
port = self.create_port(net)
self.assertIn('revision', port)
updated = self.client.update_port(port['id'], name='newport')
self.assertGreater(updated['port']['revision'], port['revision'])
@test.idempotent_id('c1c4fa41-8e89-44d0-9bfc-409f3b66dc57')
def test_update_subnet_bumps_revision(self):
net = self.create_network()
subnet = self.create_subnet(net)
self.assertIn('revision', subnet)
updated = self.client.update_subnet(subnet['id'], name='newsub')
self.assertGreater(updated['subnet']['revision'], subnet['revision'])
@test.idempotent_id('e8c5d7db-2b8d-4615-a476-6e537437c4f2')
def test_update_subnetpool_bumps_revision(self):
sp = self.create_subnetpool('subnetpool', default_prefixlen=24,
prefixes=['10.0.0.0/8'])
self.assertIn('revision', sp)
updated = self.admin_client.update_subnetpool(sp['id'], name='sp2')
self.assertGreater(updated['subnetpool']['revision'], sp['revision'])
@test.idempotent_id('6c256f71-c929-4200-b3dc-4e1843506be5')
@test.requires_ext(extension="security-group", service="network")
def test_update_sg_group_bumps_revision(self):
sg, name = self._create_security_group()
self.assertIn('revision', sg['security_group'])
update_body = self.client.update_security_group(
sg['security_group']['id'], name='new_sg_name')
self.assertGreater(update_body['security_group']['revision'],
sg['security_group']['revision'])
@test.idempotent_id('6489632f-8550-4453-a674-c98849742967')
@test.requires_ext(extension="security-group", service="network")
def test_update_port_sg_binding_bumps_revision(self):
net = self.create_network()
port = self.create_port(net)
sg = self._create_security_group()[0]
self.client.update_port(
port['id'], security_groups=[sg['security_group']['id']])
updated = self.client.show_port(port['id'])
self.client.update_port(port['id'], security_groups=[])
# TODO(kevinbenton): these extra shows after after the update are
# to work around the fact that ML2 creates the result dict before
# commit happens if the port is unbound. The update response should
# be usable directly once that is fixed.
updated2 = self.client.show_port(port['id'])
self.assertGreater(updated['port']['revision'], port['revision'])
self.assertGreater(updated2['port']['revision'],
updated['port']['revision'])
@test.idempotent_id('29c7ab2b-d1d8-425d-8cec-fcf632960f22')
@test.requires_ext(extension="security-group", service="network")
def test_update_sg_rule_bumps_sg_revision(self):
sg, name = self._create_security_group()
rule = self.client.create_security_group_rule(
security_group_id=sg['security_group']['id'],
protocol='tcp', direction='ingress', ethertype=self.ethertype,
port_range_min=60, port_range_max=70)
updated = self.client.show_security_group(sg['security_group']['id'])
self.assertGreater(updated['security_group']['revision'],
sg['security_group']['revision'])
self.client.delete_security_group_rule(
rule['security_group_rule']['id'])
updated2 = self.client.show_security_group(sg['security_group']['id'])
self.assertGreater(updated2['security_group']['revision'],
updated['security_group']['revision'])
@test.idempotent_id('4a37bde9-1975-47e0-9b8c-2c9ca36415b0')
@test.requires_ext(extension="router", service="network")
def test_update_router_bumps_revision(self):
subnet = self.create_subnet(self.create_network())
router = self.create_router(router_name='test')
self.assertIn('revision', router)
rev1 = router['revision']
router = self.client.update_router(router['id'],
name='test2')['router']
self.assertGreater(router['revision'], rev1)
self.create_router_interface(router['id'], subnet['id'])
updated = self.client.show_router(router['id'])['router']
self.assertGreater(updated['revision'], router['revision'])
@test.idempotent_id('9de71ebc-f5df-4cd0-80bc-60299fce3ce9')
@test.requires_ext(extension="router", service="network")
@test.requires_ext(extension="standard-attr-description",
service="network")
def test_update_floatingip_bumps_revision(self):
ext_id = config.CONF.network.public_network_id
network = self.create_network()
subnet = self.create_subnet(network)
router = self.create_router('test', external_network_id=ext_id)
self.create_router_interface(router['id'], subnet['id'])
port = self.create_port(network)
body = self.client.create_floatingip(
floating_network_id=ext_id,
port_id=port['id'],
description='d1'
)['floatingip']
self.assertIn('revision', body)
b2 = self.client.update_floatingip(body['id'], description='d2')
self.assertGreater(b2['floatingip']['revision'], body['revision'])

View File

@ -0,0 +1,108 @@
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import netaddr
from neutron import context as nctx
from neutron import manager
from neutron.tests.unit.plugins.ml2 import test_plugin
class TestRevisionPlugin(test_plugin.Ml2PluginV2TestCase):
def get_additional_service_plugins(self):
p = super(TestRevisionPlugin, self).get_additional_service_plugins()
p.update({'revision_plugin_name': 'revisions'})
return p
def setUp(self):
super(TestRevisionPlugin, self).setUp()
self.cp = manager.NeutronManager.get_plugin()
self.l3p = (manager.NeutronManager.
get_service_plugins()['L3_ROUTER_NAT'])
self.ctx = nctx.get_admin_context()
def test_port_name_update_revises(self):
with self.port() as port:
rev = port['port']['revision']
new = {'port': {'name': 'seaweed'}}
response = self._update('ports', port['port']['id'], new)
new_rev = response['port']['revision']
self.assertGreater(new_rev, rev)
def test_port_ip_update_revises(self):
with self.port() as port:
rev = port['port']['revision']
new = {'port': {'fixed_ips': port['port']['fixed_ips']}}
# ensure adding an IP allocation updates the port
next_ip = str(netaddr.IPAddress(
new['port']['fixed_ips'][0]['ip_address']) + 1)
new['port']['fixed_ips'].append({'ip_address': next_ip})
response = self._update('ports', port['port']['id'], new)
self.assertEqual(2, len(response['port']['fixed_ips']))
new_rev = response['port']['revision']
self.assertGreater(new_rev, rev)
# ensure deleting an IP allocation updates the port
rev = new_rev
new['port']['fixed_ips'].pop()
response = self._update('ports', port['port']['id'], new)
self.assertEqual(1, len(response['port']['fixed_ips']))
new_rev = response['port']['revision']
self.assertGreater(new_rev, rev)
def test_security_group_rule_ops_bump_security_group(self):
s = {'security_group': {'tenant_id': 'some_tenant', 'name': '',
'description': 's'}}
sg = self.cp.create_security_group(self.ctx, s)
s['security_group']['name'] = 'hello'
updated = self.cp.update_security_group(self.ctx, sg['id'], s)
self.assertGreater(updated['revision'], sg['revision'])
# ensure rule changes bump parent SG
r = {'security_group_rule': {'tenant_id': 'some_tenant',
'port_range_min': 80, 'protocol': 6,
'port_range_max': 90,
'remote_ip_prefix': '0.0.0.0/0',
'ethertype': 'IPv4',
'remote_group_id': None,
'direction': 'ingress',
'security_group_id': sg['id']}}
rule = self.cp.create_security_group_rule(self.ctx, r)
sg = updated
updated = self.cp.get_security_group(self.ctx, sg['id'])
self.assertGreater(updated['revision'], sg['revision'])
self.cp.delete_security_group_rule(self.ctx, rule['id'])
sg = updated
updated = self.cp.get_security_group(self.ctx, sg['id'])
self.assertGreater(updated['revision'], sg['revision'])
def test_router_interface_ops_bump_router(self):
r = {'router': {'name': 'myrouter', 'tenant_id': 'some_tenant',
'admin_state_up': True}}
router = self.l3p.create_router(self.ctx, r)
r['router']['name'] = 'yourrouter'
updated = self.l3p.update_router(self.ctx, router['id'], r)
self.assertGreater(updated['revision'], router['revision'])
# add an intf and make sure it bumps rev
with self.subnet(tenant_id='some_tenant') as s:
interface_info = {'subnet_id': s['subnet']['id']}
self.l3p.add_router_interface(self.ctx, router['id'], interface_info)
router = updated
updated = self.l3p.get_router(self.ctx, router['id'])
self.assertGreater(updated['revision'], router['revision'])
self.l3p.remove_router_interface(self.ctx, router['id'],
interface_info)
router = updated
updated = self.l3p.get_router(self.ctx, router['id'])
self.assertGreater(updated['revision'], router['revision'])

View File

@ -81,6 +81,7 @@ neutron.service_plugins =
auto_allocate = neutron.services.auto_allocate.plugin:Plugin
segments = neutron.services.segments.plugin:Plugin
network_ip_availability = neutron.services.network_ip_availability.plugin:NetworkIPAvailabilityPlugin
revisions = neutron.services.revisions.revision_plugin:RevisionPlugin
timestamp_core = neutron.services.timestamp.timestamp_plugin:TimeStampPlugin
trunk = neutron.services.trunk.plugin:TrunkPlugin
neutron.qos.notification_drivers =