Implement DB consistency

It is a distributed lock based on SQL. Each lock is
associated with a Neutron project(tenant) and a given
API session. It ensures that a lock is acquired and
released in the same API context.

The detailed description is in the spec review.
https://review.openstack.org/268005

Closes-Bug: #1529812
Closes-Bug: #1529326
Closes-Bug: #1497676
Related-Bug: #1527234
Implements: blueprint bp/keep-db-consistency

Change-Id: Iff916481282f2d60df66c0e916f3045f9944531e
This commit is contained in:
Li Ma 2016-02-18 10:30:42 +08:00
parent 227fd0e699
commit 3b92dc8eac
6 changed files with 385 additions and 67 deletions

View File

@ -57,3 +57,8 @@ class UnsupportedTransportException(DragonflowException):
not supported.
"""
message = _("Transport protocol is not supported: %(transport)s")
class DBLockFailed(DragonflowException):
message = _("The DB Lock cannot be acquired for object=%(oid)s in"
"the session=%(sid)s.")

View File

@ -0,0 +1,217 @@
# Copyright (c) 2015 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
eventlet.monkey_patch()
import inspect
import random
import time
from sqlalchemy import func
from sqlalchemy.orm import exc as orm_exc
from dragonflow._i18n import _LI, _LE
from dragonflow.common import exceptions as df_exceptions
from dragonflow.db.neutron import models
from neutron.db import api as db_api
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_log import log
from oslo_utils import excutils
import six
# Used to identify each API session
LOCK_SEED = 9876543210
# Used to wait and retry for Galera
DB_INIT_RETRY_INTERVAL = 1
DB_MAX_RETRY_INTERVAL = 10
# Used to wait and retry for distributed lock
LOCK_MAX_RETRIES = 100
LOCK_INIT_RETRY_INTERVAL = 2
LOCK_MAX_RETRY_INTERVAL = 10
LOCK_INC_RETRY_INTERVAL = 1
# global lock id
GLOBAL_LOCK_ID = "ffffffffffffffffffffffffffffffff"
LOG = log.getLogger(__name__)
class wrap_db_lock(object):
def __init__(self):
super(wrap_db_lock, self).__init__()
def __call__(self, f):
@six.wraps(f)
def wrap_db_lock(*args, **kwargs):
context = args[1] # the neutron context object
session_id = 0
result = None
# NOTE(nick-ma-z): In some admin operations in Neutron,
# the project_id is set to None, so we set it to a global
# lock id.
lock_id = context.project_id
if not lock_id:
lock_id = GLOBAL_LOCK_ID
# magic to prevent from nested lock
within_wrapper = False
for frame in inspect.stack()[1:]:
if frame[3] == 'wrap_db_lock':
within_wrapper = True
break
if not within_wrapper:
# test and create the lock if necessary
_test_and_create_object(lock_id)
session_id = _acquire_lock(lock_id)
try:
result = f(*args, **kwargs)
except Exception as e:
with excutils.save_and_reraise_exception() as ctxt:
ctxt.reraise = True
finally:
if not within_wrapper:
try:
_release_lock(lock_id, session_id)
except Exception as e:
LOG.exception(e)
return result
return wrap_db_lock
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
retry_interval=DB_INIT_RETRY_INTERVAL,
inc_retry_interval=True,
max_retry_interval=DB_MAX_RETRY_INTERVAL,
retry_on_deadlock=True,
retry_on_request=True)
def _acquire_lock(oid):
# generate temporary session id for this API context
sid = _generate_session_id()
# NOTE(nick-ma-z): we disallow subtransactions because the
# retry logic will bust any parent transactions
wait_lock_retries = LOCK_MAX_RETRIES
retry_interval = LOCK_INIT_RETRY_INTERVAL
while(wait_lock_retries > 0):
try:
session = db_api.get_session()
with session.begin():
LOG.info(_LI("Try to get lock for object %(oid)s in "
"session %(sid)s."), {'oid': oid, 'sid': sid})
row = _get_object_with_lock(session, oid, False)
_update_lock(session, row, True, session_id=sid)
LOG.info(_LI("Lock is acquired for object %(oid)s in "
"session %(sid)s."), {'oid': oid, 'sid': sid})
return sid
except orm_exc.NoResultFound:
LOG.info(_LI("Lock has been obtained by other sessions. "
"Wait here and retry."))
time.sleep(retry_interval)
wait_lock_retries = wait_lock_retries - 1
# dynamically increase the retry_interval until it reaches
# the maximum of interval and then return to the initial value.
if retry_interval >= LOCK_MAX_RETRY_INTERVAL:
retry_interval = LOCK_INIT_RETRY_INTERVAL
else:
retry_interval = retry_interval + LOCK_INC_RETRY_INTERVAL
# NOTE(nick-ma-z): The lock cannot be acquired.
raise df_exceptions.DBLockFailed(oid=oid, sid=sid)
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
retry_interval=DB_INIT_RETRY_INTERVAL,
inc_retry_interval=True,
max_retry_interval=DB_MAX_RETRY_INTERVAL,
retry_on_deadlock=True,
retry_on_request=True)
def _release_lock(oid, sid):
# NOTE(nick-ma-z): we disallow subtransactions because the
# retry logic will bust any parent transactions
try:
session = db_api.get_session()
with session.begin():
LOG.info(_LI("Try to get lock for object %(oid)s in "
"session %(sid)s."), {'oid': oid, 'sid': sid})
row = _get_object_with_lock(session, oid, True,
session_id=sid)
_update_lock(session, row, False, session_id=0)
LOG.info(_LI("Lock is released for object %(oid)s in "
"session %(sid)s."), {'oid': oid, 'sid': sid})
except orm_exc.NoResultFound:
LOG.exception(_LE("The lock for object %(oid)s is lost in the "
"session %(sid)s and obtained by other "
"sessions."), {'oid': oid, 'sid': sid})
def _generate_session_id():
return random.randint(0, LOCK_SEED)
def _test_and_create_object(id):
try:
session = db_api.get_session()
with session.begin():
session.query(models.DFLockedObjects).filter_by(
object_uuid=id).one()
except orm_exc.NoResultFound:
try:
session = db_api.get_session()
with session.begin():
_create_db_row(session, oid=id)
except db_exc.DBDuplicateEntry:
# the lock is concurrently created.
pass
def _get_object_with_lock(session, id, state, session_id=None):
row = None
if session_id:
row = session.query(models.DFLockedObjects).filter_by(
object_uuid=id, lock=state,
session_id=session_id).with_for_update().one()
else:
row = session.query(models.DFLockedObjects).filter_by(
object_uuid=id, lock=state).with_for_update().one()
return row
def _update_lock(session, row, lock, session_id):
row.lock = lock
row.session_id = session_id
session.merge(row)
session.flush()
def _create_db_row(session, oid):
row = models.DFLockedObjects(object_uuid=oid,
lock=False, session_id=0,
created_at=func.now())
session.add(row)
session.flush()

View File

@ -1 +1 @@
1dee3dc24674
f03c862d2645

View File

@ -0,0 +1,37 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Dragonflow versioned objects
Revision ID: f03c862d2645
Revises: 1dee3dc24674
Create Date: 2016-02-18 10:09:29.112343
"""
# revision identifiers, used by Alembic.
revision = 'f03c862d2645'
down_revision = '1dee3dc24674'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'dflockedobjects',
sa.Column('object_uuid', sa.String(36), primary_key=True),
sa.Column('lock', sa.Boolean, default=False),
sa.Column('session_id', sa.BigInteger, default=0),
sa.Column('created_at', sa.DateTime)
)

View File

@ -0,0 +1,27 @@
# Copyright (c) 2015 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from neutron.db import model_base
class DFLockedObjects(model_base.BASEV2):
__tablename__ = 'dflockedobjects'
object_uuid = sa.Column(sa.String(36), primary_key=True)
lock = sa.Column(sa.Boolean, default=False)
session_id = sa.Column(sa.BigInteger, default=0)
created_at = sa.Column(sa.DateTime)

View File

@ -54,6 +54,7 @@ from dragonflow.common import common_params
from dragonflow.common import constants as df_common_const
from dragonflow.common import exceptions as df_exceptions
from dragonflow.db import api_nb
from dragonflow.db.neutron import lockedobjects_db as lock_db
from dragonflow.neutron.common import constants as df_const
LOG = log.getLogger(__name__)
@ -172,53 +173,62 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.exception(_LE("Exception auto-deleting port %s"),
port.id)
@lock_db.wrap_db_lock()
def create_security_group(self, context, security_group,
default_sg=False):
sg_db = super(DFPlugin,
self).create_security_group(context, security_group,
default_sg)
with context.session.begin(subtransactions=True):
sg_db = super(DFPlugin,
self).create_security_group(context, security_group,
default_sg)
sg_name = sg_db['id']
tenant_id = sg_db['tenant_id']
rules = sg_db.get('security_group_rules')
self.nb_api.create_security_group(name=sg_name, topic=tenant_id,
rules=rules)
return sg_db
@lock_db.wrap_db_lock()
def create_security_group_rule(self, context, security_group_rule):
sg_rule = super(DFPlugin, self).create_security_group_rule(
context, security_group_rule)
sg_id = sg_rule['security_group_id']
sg_group = self.get_security_group(context, sg_id)
with context.session.begin(subtransactions=True):
sg_rule = super(DFPlugin, self).create_security_group_rule(
context, security_group_rule)
sg_id = sg_rule['security_group_id']
sg_group = self.get_security_group(context, sg_id)
self.nb_api.add_security_group_rules(sg_id, [sg_rule],
sg_group['tenant_id'])
return sg_rule
@lock_db.wrap_db_lock()
def delete_security_group_rule(self, context, id):
security_group_rule = self.get_security_group_rule(context, id)
sg_id = security_group_rule['security_group_id']
sg_group = self.get_security_group(context, sg_id)
super(DFPlugin, self).delete_security_group_rule(context, id)
with context.session.begin(subtransactions=True):
security_group_rule = self.get_security_group_rule(context, id)
sg_id = security_group_rule['security_group_id']
sg_group = self.get_security_group(context, sg_id)
super(DFPlugin, self).delete_security_group_rule(context, id)
self.nb_api.delete_security_group_rule(sg_id, id,
sg_group['tenant_id'])
@lock_db.wrap_db_lock()
def delete_security_group(self, context, sg_id):
sg = self.get_security_group(context, sg_id)
tenant_id = sg['tenant_id']
super(DFPlugin, self).delete_security_group(context,
sg_id)
with context.session.begin(subtransactions=True):
super(DFPlugin, self).delete_security_group(context, sg_id)
self.nb_api.delete_security_group(sg_id, topic=tenant_id)
@lock_db.wrap_db_lock()
def create_subnet(self, context, subnet):
net_id = subnet['subnet']['network_id']
new_subnet = None
with context.session.begin(subtransactions=True):
# create subnet in DB
new_subnet = super(DFPlugin,
self).create_subnet(context, subnet)
net_id = new_subnet['network_id']
dhcp_address = self._handle_create_subnet_dhcp(
context,
new_subnet)
# update df controller with subnet
context, new_subnet)
if new_subnet:
self.nb_api.add_subnet(
new_subnet['id'],
net_id,
@ -228,20 +238,22 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
dhcp_ip=dhcp_address,
gateway_ip=new_subnet['gateway_ip'],
dns_nameservers=new_subnet.get('dns_nameservers', []))
return new_subnet
@lock_db.wrap_db_lock()
def update_subnet(self, context, id, subnet):
with context.session.begin(subtransactions=True):
# update subnet in DB
original_subnet = super(DFPlugin, self).get_subnet(context, id)
new_subnet = super(DFPlugin,
self).update_subnet(context, id, subnet)
net_id = new_subnet['network_id']
dhcp_address = self._update_subnet_dhcp(
context,
original_subnet,
new_subnet)
net_id = new_subnet['network_id']
if new_subnet and net_id:
# update df controller with subnet
self.nb_api.update_subnet(
new_subnet['id'],
@ -252,15 +264,18 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
dhcp_ip=dhcp_address,
gateway_ip=new_subnet['gateway_ip'],
dns_nameservers=new_subnet.get('dns_nameservers', []))
return new_subnet
return new_subnet
@lock_db.wrap_db_lock()
def delete_subnet(self, context, id):
orig_subnet = super(DFPlugin, self).get_subnet(context, id)
net_id = orig_subnet['network_id']
with context.session.begin(subtransactions=True):
# delete subnet in DB
super(DFPlugin, self).delete_subnet(context, id)
# update df controller with subnet delete
# update df controller with subnet delete
if net_id:
try:
self.nb_api.delete_subnet(id, net_id,
orig_subnet['tenant_id'])
@ -268,24 +283,24 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.debug("network %s is not found in DB, might have "
"been deleted concurrently" % net_id)
@lock_db.wrap_db_lock()
def create_network(self, context, network):
with context.session.begin(subtransactions=True):
result = super(DFPlugin, self).create_network(context,
network)
self._process_l3_create(context, result, network['network'])
self.create_network_nb_api(context, result)
return result
return self.create_network_nb_api(result)
def create_network_nb_api(self, network):
def create_network_nb_api(self, context, network):
external_ids = {df_const.DF_NETWORK_NAME_EXT_ID_KEY: network['name']}
# TODO(DF): Undo logical switch creation on failure
self.nb_api.create_lswitch(name=network['id'],
topic=network['tenant_id'],
external_ids=external_ids,
subnets=[])
return network
@lock_db.wrap_db_lock()
def delete_network(self, context, network_id):
with context.session.begin(subtransactions=True):
network = self.get_network(context, network_id)
@ -305,20 +320,22 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
LOG.debug("port %s is not found in DB, might have"
"been deleted concurrently" % port.get_id())
try:
self.nb_api.delete_lswitch(name=network_id, topic=tenant_id)
self.nb_api.delete_lswitch(name=network_id,
topic=tenant_id)
except df_exceptions.DBKeyNotFound:
LOG.debug("lswitch %s is not found in DF DB, might have "
"been deleted concurrently" % network_id)
@lock_db.wrap_db_lock()
def update_network(self, context, network_id, network):
pnet._raise_if_updates_provider_attributes(network['network'])
# TODO(gsagie) rollback needed
with context.session.begin(subtransactions=True):
result = super(DFPlugin, self).update_network(context, network_id,
network)
self._process_l3_update(context, result, network['network'])
return result
return result
@lock_db.wrap_db_lock()
def update_port(self, context, id, port):
with context.session.begin(subtransactions=True):
parent_name, tag = self._get_data_from_binding_profile(
@ -338,6 +355,7 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
id,
port,
updated_port=updated_port)
external_ids = {
df_const.DF_PORT_NAME_EXT_ID_KEY: updated_port['name']}
allowed_macs = self._get_allowed_mac_addresses_from_port(
@ -368,11 +386,12 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
macs=[updated_port['mac_address']], ips=ips,
external_ids=external_ids,
parent_name=parent_name, tag=tag,
enabled=updated_port['admin_state_up'],
enabled=updated_port['admin_'
'state_up'],
port_security=allowed_macs,
chassis=chassis,
device_owner=updated_port.get('device_owner',
None),
device_owner=updated_port.get(
'device_owner', None),
security_groups=security_groups)
return updated_port
@ -418,6 +437,7 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
allowed_macs.add(allowed_address['mac_address'])
return list(allowed_macs)
@lock_db.wrap_db_lock()
def create_port(self, context, port):
with context.session.begin(subtransactions=True):
parent_name, tag = self._get_data_from_binding_profile(
@ -439,7 +459,6 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
port['port'][df_const.DF_PORT_BINDING_PROFILE])
self._process_port_create_extra_dhcp_opts(context, db_port,
dhcp_opts)
# This extra lookup is necessary to get the latest db model
# for the extension functions.
port_model = self._get_port(context, db_port['id'])
@ -507,6 +526,7 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
{'port_id': port['id'],
'port_owner': port['device_owner']})
@lock_db.wrap_db_lock()
def delete_port(self, context, port_id, l3_port_check=True):
self._pre_delete_port(context, port_id, l3_port_check)
try:
@ -516,6 +536,7 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
except df_exceptions.DBKeyNotFound:
LOG.debug("port %s is not found in DF DB, might have "
"been deleted concurrently" % port_id)
with context.session.begin(subtransactions=True):
self.disassociate_floatingips(context, port_id)
super(DFPlugin, self).delete_port(context, port_id)
@ -524,9 +545,12 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
super(DFPlugin, self).extend_port_dict_binding(port_res, port_db)
port_res[portbindings.VNIC_TYPE] = portbindings.VNIC_NORMAL
@lock_db.wrap_db_lock()
def create_router(self, context, router):
router = super(DFPlugin, self).create_router(
context, router)
with context.session.begin(subtransactions=True):
router = super(DFPlugin, self).create_router(
context, router)
router_name = router['id']
tenant_id = router['tenant_id']
is_distributed = router.get('distributed', False)
@ -536,23 +560,25 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
external_ids=external_ids,
distributed=is_distributed,
ports=[])
# TODO(gsagie) rollback router creation on failure
return router
@lock_db.wrap_db_lock()
def delete_router(self, context, router_id):
router_name = router_id
router = self.get_router(context, router_id)
try:
router = self.get_router(context, router_id)
self.nb_api.delete_lrouter(name=router_name,
topic=router['tenant_id'])
except df_exceptions.DBKeyNotFound:
LOG.debug("router %s is not found in DF DB, might have "
"been deleted concurrently" % router_name)
ret_val = super(DFPlugin, self).delete_router(context,
router_id)
with context.session.begin(subtransactions=True):
ret_val = super(DFPlugin, self).delete_router(context,
router_id)
return ret_val
@lock_db.wrap_db_lock()
def add_router_interface(self, context, router_id, interface_info):
add_by_port, add_by_sub = self._validate_interface_info(
interface_info)
@ -589,12 +615,17 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
interface_info['port_id'] = port['id']
if 'subnet_id' in interface_info:
del interface_info['subnet_id']
return super(DFPlugin, self).add_router_interface(
context, router_id, interface_info)
with context.session.begin(subtransactions=True):
result = super(DFPlugin, self).add_router_interface(
context, router_id, interface_info)
return result
@lock_db.wrap_db_lock()
def remove_router_interface(self, context, router_id, interface_info):
new_router = super(DFPlugin, self).remove_router_interface(
context, router_id, interface_info)
with context.session.begin(subtransactions=True):
new_router = super(DFPlugin, self).remove_router_interface(
context, router_id, interface_info)
subnet = self.get_subnet(context, new_router['subnet_id'])
network_id = subnet['network_id']
@ -604,9 +635,9 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
network_id,
subnet['tenant_id'])
except df_exceptions.DBKeyNotFound:
LOG.debug("logical router %s is not found in DF DB, suppressing "
" delete_lrouter_port exception" % router_id)
LOG.debug("logical router %s is not found in DF DB, "
"suppressing delete_lrouter_port "
"exception" % router_id)
return new_router
def _create_dhcp_server_port(self, context, subnet):
@ -703,37 +734,38 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
return self._get_ip_from_port(dhcp_port)
return None
@lock_db.wrap_db_lock()
def create_floatingip(self, context, floatingip):
with context.session.begin(subtransactions=True):
floatingip_dict = super(DFPlugin, self).create_floatingip(
context, floatingip,
initial_status=const.FLOATINGIP_STATUS_DOWN)
self.nb_api.create_floatingip(
name=floatingip_dict['id'],
topic=floatingip_dict['tenant_id'],
floating_ip_address=floatingip_dict['floating_ip_address'],
floating_network_id=floatingip_dict['floating_network_id'],
router_id=floatingip_dict['router_id'],
port_id=floatingip_dict['port_id'],
fixed_ip_address=floatingip_dict['fixed_ip_address'],
status=floatingip_dict['status'])
self.nb_api.create_floatingip(
name=floatingip_dict['id'],
topic=floatingip_dict['tenant_id'],
floating_ip_address=floatingip_dict['floating_ip_address'],
floating_network_id=floatingip_dict['floating_network_id'],
router_id=floatingip_dict['router_id'],
port_id=floatingip_dict['port_id'],
fixed_ip_address=floatingip_dict['fixed_ip_address'],
status=floatingip_dict['status'])
return floatingip_dict
@lock_db.wrap_db_lock()
def update_floatingip(self, context, id, floatingip):
with context.session.begin(subtransactions=True):
floatingip_dict = super(DFPlugin, self).update_floatingip(
context, id, floatingip)
self.nb_api.update_floatingip(
name=floatingip_dict['id'],
topic=floatingip_dict['tenant_id'],
router_id=floatingip_dict['router_id'],
port_id=floatingip_dict['port_id'],
fixed_ip_address=floatingip_dict['fixed_ip_address'],
status=floatingip_dict['status'])
self.nb_api.update_floatingip(
name=floatingip_dict['id'],
topic=floatingip_dict['tenant_id'],
router_id=floatingip_dict['router_id'],
port_id=floatingip_dict['port_id'],
fixed_ip_address=floatingip_dict['fixed_ip_address'],
status=floatingip_dict['status'])
return floatingip_dict
@lock_db.wrap_db_lock()
def delete_floatingip(self, context, id):
with context.session.begin(subtransactions=True):
floatingip = self.get_floatingip(context, id)