Implement DB consistency
It is a distributed lock based on SQL. Each lock is associated with a Neutron project(tenant) and a given API session. It ensures that a lock is acquired and released in the same API context. The detailed description is in the spec review. https://review.openstack.org/268005 Closes-Bug: #1529812 Closes-Bug: #1529326 Closes-Bug: #1497676 Related-Bug: #1527234 Implements: blueprint bp/keep-db-consistency Change-Id: Iff916481282f2d60df66c0e916f3045f9944531e
This commit is contained in:
parent
227fd0e699
commit
3b92dc8eac
|
@ -57,3 +57,8 @@ class UnsupportedTransportException(DragonflowException):
|
|||
not supported.
|
||||
"""
|
||||
message = _("Transport protocol is not supported: %(transport)s")
|
||||
|
||||
|
||||
class DBLockFailed(DragonflowException):
|
||||
message = _("The DB Lock cannot be acquired for object=%(oid)s in"
|
||||
"the session=%(sid)s.")
|
||||
|
|
|
@ -0,0 +1,217 @@
|
|||
# Copyright (c) 2015 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
import inspect
|
||||
import random
|
||||
import time
|
||||
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.orm import exc as orm_exc
|
||||
|
||||
from dragonflow._i18n import _LI, _LE
|
||||
from dragonflow.common import exceptions as df_exceptions
|
||||
from dragonflow.db.neutron import models
|
||||
|
||||
from neutron.db import api as db_api
|
||||
|
||||
from oslo_db import api as oslo_db_api
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log
|
||||
from oslo_utils import excutils
|
||||
import six
|
||||
|
||||
|
||||
# Used to identify each API session
|
||||
LOCK_SEED = 9876543210
|
||||
|
||||
# Used to wait and retry for Galera
|
||||
DB_INIT_RETRY_INTERVAL = 1
|
||||
DB_MAX_RETRY_INTERVAL = 10
|
||||
|
||||
# Used to wait and retry for distributed lock
|
||||
LOCK_MAX_RETRIES = 100
|
||||
LOCK_INIT_RETRY_INTERVAL = 2
|
||||
LOCK_MAX_RETRY_INTERVAL = 10
|
||||
LOCK_INC_RETRY_INTERVAL = 1
|
||||
|
||||
# global lock id
|
||||
GLOBAL_LOCK_ID = "ffffffffffffffffffffffffffffffff"
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class wrap_db_lock(object):
|
||||
|
||||
def __init__(self):
|
||||
super(wrap_db_lock, self).__init__()
|
||||
|
||||
def __call__(self, f):
|
||||
@six.wraps(f)
|
||||
def wrap_db_lock(*args, **kwargs):
|
||||
context = args[1] # the neutron context object
|
||||
session_id = 0
|
||||
result = None
|
||||
|
||||
# NOTE(nick-ma-z): In some admin operations in Neutron,
|
||||
# the project_id is set to None, so we set it to a global
|
||||
# lock id.
|
||||
lock_id = context.project_id
|
||||
if not lock_id:
|
||||
lock_id = GLOBAL_LOCK_ID
|
||||
|
||||
# magic to prevent from nested lock
|
||||
within_wrapper = False
|
||||
for frame in inspect.stack()[1:]:
|
||||
if frame[3] == 'wrap_db_lock':
|
||||
within_wrapper = True
|
||||
break
|
||||
|
||||
if not within_wrapper:
|
||||
# test and create the lock if necessary
|
||||
_test_and_create_object(lock_id)
|
||||
session_id = _acquire_lock(lock_id)
|
||||
|
||||
try:
|
||||
result = f(*args, **kwargs)
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception() as ctxt:
|
||||
ctxt.reraise = True
|
||||
finally:
|
||||
if not within_wrapper:
|
||||
try:
|
||||
_release_lock(lock_id, session_id)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
|
||||
return result
|
||||
return wrap_db_lock
|
||||
|
||||
|
||||
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
|
||||
retry_interval=DB_INIT_RETRY_INTERVAL,
|
||||
inc_retry_interval=True,
|
||||
max_retry_interval=DB_MAX_RETRY_INTERVAL,
|
||||
retry_on_deadlock=True,
|
||||
retry_on_request=True)
|
||||
def _acquire_lock(oid):
|
||||
# generate temporary session id for this API context
|
||||
sid = _generate_session_id()
|
||||
|
||||
# NOTE(nick-ma-z): we disallow subtransactions because the
|
||||
# retry logic will bust any parent transactions
|
||||
wait_lock_retries = LOCK_MAX_RETRIES
|
||||
retry_interval = LOCK_INIT_RETRY_INTERVAL
|
||||
while(wait_lock_retries > 0):
|
||||
try:
|
||||
session = db_api.get_session()
|
||||
with session.begin():
|
||||
LOG.info(_LI("Try to get lock for object %(oid)s in "
|
||||
"session %(sid)s."), {'oid': oid, 'sid': sid})
|
||||
row = _get_object_with_lock(session, oid, False)
|
||||
_update_lock(session, row, True, session_id=sid)
|
||||
LOG.info(_LI("Lock is acquired for object %(oid)s in "
|
||||
"session %(sid)s."), {'oid': oid, 'sid': sid})
|
||||
return sid
|
||||
except orm_exc.NoResultFound:
|
||||
LOG.info(_LI("Lock has been obtained by other sessions. "
|
||||
"Wait here and retry."))
|
||||
time.sleep(retry_interval)
|
||||
wait_lock_retries = wait_lock_retries - 1
|
||||
|
||||
# dynamically increase the retry_interval until it reaches
|
||||
# the maximum of interval and then return to the initial value.
|
||||
if retry_interval >= LOCK_MAX_RETRY_INTERVAL:
|
||||
retry_interval = LOCK_INIT_RETRY_INTERVAL
|
||||
else:
|
||||
retry_interval = retry_interval + LOCK_INC_RETRY_INTERVAL
|
||||
|
||||
# NOTE(nick-ma-z): The lock cannot be acquired.
|
||||
raise df_exceptions.DBLockFailed(oid=oid, sid=sid)
|
||||
|
||||
|
||||
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
|
||||
retry_interval=DB_INIT_RETRY_INTERVAL,
|
||||
inc_retry_interval=True,
|
||||
max_retry_interval=DB_MAX_RETRY_INTERVAL,
|
||||
retry_on_deadlock=True,
|
||||
retry_on_request=True)
|
||||
def _release_lock(oid, sid):
|
||||
# NOTE(nick-ma-z): we disallow subtransactions because the
|
||||
# retry logic will bust any parent transactions
|
||||
try:
|
||||
session = db_api.get_session()
|
||||
with session.begin():
|
||||
LOG.info(_LI("Try to get lock for object %(oid)s in "
|
||||
"session %(sid)s."), {'oid': oid, 'sid': sid})
|
||||
row = _get_object_with_lock(session, oid, True,
|
||||
session_id=sid)
|
||||
_update_lock(session, row, False, session_id=0)
|
||||
LOG.info(_LI("Lock is released for object %(oid)s in "
|
||||
"session %(sid)s."), {'oid': oid, 'sid': sid})
|
||||
except orm_exc.NoResultFound:
|
||||
LOG.exception(_LE("The lock for object %(oid)s is lost in the "
|
||||
"session %(sid)s and obtained by other "
|
||||
"sessions."), {'oid': oid, 'sid': sid})
|
||||
|
||||
|
||||
def _generate_session_id():
|
||||
return random.randint(0, LOCK_SEED)
|
||||
|
||||
|
||||
def _test_and_create_object(id):
|
||||
try:
|
||||
session = db_api.get_session()
|
||||
with session.begin():
|
||||
session.query(models.DFLockedObjects).filter_by(
|
||||
object_uuid=id).one()
|
||||
except orm_exc.NoResultFound:
|
||||
try:
|
||||
session = db_api.get_session()
|
||||
with session.begin():
|
||||
_create_db_row(session, oid=id)
|
||||
except db_exc.DBDuplicateEntry:
|
||||
# the lock is concurrently created.
|
||||
pass
|
||||
|
||||
|
||||
def _get_object_with_lock(session, id, state, session_id=None):
|
||||
row = None
|
||||
if session_id:
|
||||
row = session.query(models.DFLockedObjects).filter_by(
|
||||
object_uuid=id, lock=state,
|
||||
session_id=session_id).with_for_update().one()
|
||||
else:
|
||||
row = session.query(models.DFLockedObjects).filter_by(
|
||||
object_uuid=id, lock=state).with_for_update().one()
|
||||
return row
|
||||
|
||||
|
||||
def _update_lock(session, row, lock, session_id):
|
||||
row.lock = lock
|
||||
row.session_id = session_id
|
||||
session.merge(row)
|
||||
session.flush()
|
||||
|
||||
|
||||
def _create_db_row(session, oid):
|
||||
row = models.DFLockedObjects(object_uuid=oid,
|
||||
lock=False, session_id=0,
|
||||
created_at=func.now())
|
||||
session.add(row)
|
||||
session.flush()
|
|
@ -1 +1 @@
|
|||
1dee3dc24674
|
||||
f03c862d2645
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
# Copyright (c) 2015 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
"""Dragonflow versioned objects
|
||||
Revision ID: f03c862d2645
|
||||
Revises: 1dee3dc24674
|
||||
Create Date: 2016-02-18 10:09:29.112343
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'f03c862d2645'
|
||||
down_revision = '1dee3dc24674'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.create_table(
|
||||
'dflockedobjects',
|
||||
sa.Column('object_uuid', sa.String(36), primary_key=True),
|
||||
sa.Column('lock', sa.Boolean, default=False),
|
||||
sa.Column('session_id', sa.BigInteger, default=0),
|
||||
sa.Column('created_at', sa.DateTime)
|
||||
)
|
|
@ -0,0 +1,27 @@
|
|||
# Copyright (c) 2015 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from neutron.db import model_base
|
||||
|
||||
|
||||
class DFLockedObjects(model_base.BASEV2):
|
||||
__tablename__ = 'dflockedobjects'
|
||||
|
||||
object_uuid = sa.Column(sa.String(36), primary_key=True)
|
||||
lock = sa.Column(sa.Boolean, default=False)
|
||||
session_id = sa.Column(sa.BigInteger, default=0)
|
||||
created_at = sa.Column(sa.DateTime)
|
|
@ -54,6 +54,7 @@ from dragonflow.common import common_params
|
|||
from dragonflow.common import constants as df_common_const
|
||||
from dragonflow.common import exceptions as df_exceptions
|
||||
from dragonflow.db import api_nb
|
||||
from dragonflow.db.neutron import lockedobjects_db as lock_db
|
||||
from dragonflow.neutron.common import constants as df_const
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -172,53 +173,62 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
LOG.exception(_LE("Exception auto-deleting port %s"),
|
||||
port.id)
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def create_security_group(self, context, security_group,
|
||||
default_sg=False):
|
||||
sg_db = super(DFPlugin,
|
||||
self).create_security_group(context, security_group,
|
||||
default_sg)
|
||||
with context.session.begin(subtransactions=True):
|
||||
sg_db = super(DFPlugin,
|
||||
self).create_security_group(context, security_group,
|
||||
default_sg)
|
||||
sg_name = sg_db['id']
|
||||
tenant_id = sg_db['tenant_id']
|
||||
rules = sg_db.get('security_group_rules')
|
||||
|
||||
self.nb_api.create_security_group(name=sg_name, topic=tenant_id,
|
||||
rules=rules)
|
||||
|
||||
return sg_db
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def create_security_group_rule(self, context, security_group_rule):
|
||||
sg_rule = super(DFPlugin, self).create_security_group_rule(
|
||||
context, security_group_rule)
|
||||
sg_id = sg_rule['security_group_id']
|
||||
sg_group = self.get_security_group(context, sg_id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
sg_rule = super(DFPlugin, self).create_security_group_rule(
|
||||
context, security_group_rule)
|
||||
sg_id = sg_rule['security_group_id']
|
||||
sg_group = self.get_security_group(context, sg_id)
|
||||
self.nb_api.add_security_group_rules(sg_id, [sg_rule],
|
||||
sg_group['tenant_id'])
|
||||
return sg_rule
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def delete_security_group_rule(self, context, id):
|
||||
security_group_rule = self.get_security_group_rule(context, id)
|
||||
sg_id = security_group_rule['security_group_id']
|
||||
sg_group = self.get_security_group(context, sg_id)
|
||||
super(DFPlugin, self).delete_security_group_rule(context, id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
security_group_rule = self.get_security_group_rule(context, id)
|
||||
sg_id = security_group_rule['security_group_id']
|
||||
sg_group = self.get_security_group(context, sg_id)
|
||||
super(DFPlugin, self).delete_security_group_rule(context, id)
|
||||
self.nb_api.delete_security_group_rule(sg_id, id,
|
||||
sg_group['tenant_id'])
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def delete_security_group(self, context, sg_id):
|
||||
sg = self.get_security_group(context, sg_id)
|
||||
tenant_id = sg['tenant_id']
|
||||
super(DFPlugin, self).delete_security_group(context,
|
||||
sg_id)
|
||||
with context.session.begin(subtransactions=True):
|
||||
super(DFPlugin, self).delete_security_group(context, sg_id)
|
||||
self.nb_api.delete_security_group(sg_id, topic=tenant_id)
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def create_subnet(self, context, subnet):
|
||||
net_id = subnet['subnet']['network_id']
|
||||
new_subnet = None
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
# create subnet in DB
|
||||
new_subnet = super(DFPlugin,
|
||||
self).create_subnet(context, subnet)
|
||||
net_id = new_subnet['network_id']
|
||||
dhcp_address = self._handle_create_subnet_dhcp(
|
||||
context,
|
||||
new_subnet)
|
||||
# update df controller with subnet
|
||||
context, new_subnet)
|
||||
if new_subnet:
|
||||
self.nb_api.add_subnet(
|
||||
new_subnet['id'],
|
||||
net_id,
|
||||
|
@ -228,20 +238,22 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
dhcp_ip=dhcp_address,
|
||||
gateway_ip=new_subnet['gateway_ip'],
|
||||
dns_nameservers=new_subnet.get('dns_nameservers', []))
|
||||
|
||||
return new_subnet
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def update_subnet(self, context, id, subnet):
|
||||
with context.session.begin(subtransactions=True):
|
||||
# update subnet in DB
|
||||
original_subnet = super(DFPlugin, self).get_subnet(context, id)
|
||||
new_subnet = super(DFPlugin,
|
||||
self).update_subnet(context, id, subnet)
|
||||
net_id = new_subnet['network_id']
|
||||
dhcp_address = self._update_subnet_dhcp(
|
||||
context,
|
||||
original_subnet,
|
||||
new_subnet)
|
||||
net_id = new_subnet['network_id']
|
||||
|
||||
if new_subnet and net_id:
|
||||
# update df controller with subnet
|
||||
self.nb_api.update_subnet(
|
||||
new_subnet['id'],
|
||||
|
@ -252,15 +264,18 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
dhcp_ip=dhcp_address,
|
||||
gateway_ip=new_subnet['gateway_ip'],
|
||||
dns_nameservers=new_subnet.get('dns_nameservers', []))
|
||||
return new_subnet
|
||||
return new_subnet
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def delete_subnet(self, context, id):
|
||||
orig_subnet = super(DFPlugin, self).get_subnet(context, id)
|
||||
net_id = orig_subnet['network_id']
|
||||
with context.session.begin(subtransactions=True):
|
||||
# delete subnet in DB
|
||||
super(DFPlugin, self).delete_subnet(context, id)
|
||||
# update df controller with subnet delete
|
||||
|
||||
# update df controller with subnet delete
|
||||
if net_id:
|
||||
try:
|
||||
self.nb_api.delete_subnet(id, net_id,
|
||||
orig_subnet['tenant_id'])
|
||||
|
@ -268,24 +283,24 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
LOG.debug("network %s is not found in DB, might have "
|
||||
"been deleted concurrently" % net_id)
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def create_network(self, context, network):
|
||||
with context.session.begin(subtransactions=True):
|
||||
result = super(DFPlugin, self).create_network(context,
|
||||
network)
|
||||
self._process_l3_create(context, result, network['network'])
|
||||
self.create_network_nb_api(context, result)
|
||||
return result
|
||||
|
||||
return self.create_network_nb_api(result)
|
||||
|
||||
def create_network_nb_api(self, network):
|
||||
def create_network_nb_api(self, context, network):
|
||||
external_ids = {df_const.DF_NETWORK_NAME_EXT_ID_KEY: network['name']}
|
||||
|
||||
# TODO(DF): Undo logical switch creation on failure
|
||||
self.nb_api.create_lswitch(name=network['id'],
|
||||
topic=network['tenant_id'],
|
||||
external_ids=external_ids,
|
||||
subnets=[])
|
||||
return network
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def delete_network(self, context, network_id):
|
||||
with context.session.begin(subtransactions=True):
|
||||
network = self.get_network(context, network_id)
|
||||
|
@ -305,20 +320,22 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
LOG.debug("port %s is not found in DB, might have"
|
||||
"been deleted concurrently" % port.get_id())
|
||||
try:
|
||||
self.nb_api.delete_lswitch(name=network_id, topic=tenant_id)
|
||||
self.nb_api.delete_lswitch(name=network_id,
|
||||
topic=tenant_id)
|
||||
except df_exceptions.DBKeyNotFound:
|
||||
LOG.debug("lswitch %s is not found in DF DB, might have "
|
||||
"been deleted concurrently" % network_id)
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def update_network(self, context, network_id, network):
|
||||
pnet._raise_if_updates_provider_attributes(network['network'])
|
||||
# TODO(gsagie) rollback needed
|
||||
with context.session.begin(subtransactions=True):
|
||||
result = super(DFPlugin, self).update_network(context, network_id,
|
||||
network)
|
||||
self._process_l3_update(context, result, network['network'])
|
||||
return result
|
||||
return result
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def update_port(self, context, id, port):
|
||||
with context.session.begin(subtransactions=True):
|
||||
parent_name, tag = self._get_data_from_binding_profile(
|
||||
|
@ -338,6 +355,7 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
id,
|
||||
port,
|
||||
updated_port=updated_port)
|
||||
|
||||
external_ids = {
|
||||
df_const.DF_PORT_NAME_EXT_ID_KEY: updated_port['name']}
|
||||
allowed_macs = self._get_allowed_mac_addresses_from_port(
|
||||
|
@ -368,11 +386,12 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
macs=[updated_port['mac_address']], ips=ips,
|
||||
external_ids=external_ids,
|
||||
parent_name=parent_name, tag=tag,
|
||||
enabled=updated_port['admin_state_up'],
|
||||
enabled=updated_port['admin_'
|
||||
'state_up'],
|
||||
port_security=allowed_macs,
|
||||
chassis=chassis,
|
||||
device_owner=updated_port.get('device_owner',
|
||||
None),
|
||||
device_owner=updated_port.get(
|
||||
'device_owner', None),
|
||||
security_groups=security_groups)
|
||||
return updated_port
|
||||
|
||||
|
@ -418,6 +437,7 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
allowed_macs.add(allowed_address['mac_address'])
|
||||
return list(allowed_macs)
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def create_port(self, context, port):
|
||||
with context.session.begin(subtransactions=True):
|
||||
parent_name, tag = self._get_data_from_binding_profile(
|
||||
|
@ -439,7 +459,6 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
port['port'][df_const.DF_PORT_BINDING_PROFILE])
|
||||
self._process_port_create_extra_dhcp_opts(context, db_port,
|
||||
dhcp_opts)
|
||||
|
||||
# This extra lookup is necessary to get the latest db model
|
||||
# for the extension functions.
|
||||
port_model = self._get_port(context, db_port['id'])
|
||||
|
@ -507,6 +526,7 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
{'port_id': port['id'],
|
||||
'port_owner': port['device_owner']})
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def delete_port(self, context, port_id, l3_port_check=True):
|
||||
self._pre_delete_port(context, port_id, l3_port_check)
|
||||
try:
|
||||
|
@ -516,6 +536,7 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
except df_exceptions.DBKeyNotFound:
|
||||
LOG.debug("port %s is not found in DF DB, might have "
|
||||
"been deleted concurrently" % port_id)
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
self.disassociate_floatingips(context, port_id)
|
||||
super(DFPlugin, self).delete_port(context, port_id)
|
||||
|
@ -524,9 +545,12 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
super(DFPlugin, self).extend_port_dict_binding(port_res, port_db)
|
||||
port_res[portbindings.VNIC_TYPE] = portbindings.VNIC_NORMAL
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def create_router(self, context, router):
|
||||
router = super(DFPlugin, self).create_router(
|
||||
context, router)
|
||||
with context.session.begin(subtransactions=True):
|
||||
router = super(DFPlugin, self).create_router(
|
||||
context, router)
|
||||
|
||||
router_name = router['id']
|
||||
tenant_id = router['tenant_id']
|
||||
is_distributed = router.get('distributed', False)
|
||||
|
@ -536,23 +560,25 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
external_ids=external_ids,
|
||||
distributed=is_distributed,
|
||||
ports=[])
|
||||
|
||||
# TODO(gsagie) rollback router creation on failure
|
||||
return router
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def delete_router(self, context, router_id):
|
||||
router_name = router_id
|
||||
router = self.get_router(context, router_id)
|
||||
try:
|
||||
router = self.get_router(context, router_id)
|
||||
self.nb_api.delete_lrouter(name=router_name,
|
||||
topic=router['tenant_id'])
|
||||
except df_exceptions.DBKeyNotFound:
|
||||
LOG.debug("router %s is not found in DF DB, might have "
|
||||
"been deleted concurrently" % router_name)
|
||||
ret_val = super(DFPlugin, self).delete_router(context,
|
||||
router_id)
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
ret_val = super(DFPlugin, self).delete_router(context,
|
||||
router_id)
|
||||
return ret_val
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def add_router_interface(self, context, router_id, interface_info):
|
||||
add_by_port, add_by_sub = self._validate_interface_info(
|
||||
interface_info)
|
||||
|
@ -589,12 +615,17 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
interface_info['port_id'] = port['id']
|
||||
if 'subnet_id' in interface_info:
|
||||
del interface_info['subnet_id']
|
||||
return super(DFPlugin, self).add_router_interface(
|
||||
context, router_id, interface_info)
|
||||
|
||||
with context.session.begin(subtransactions=True):
|
||||
result = super(DFPlugin, self).add_router_interface(
|
||||
context, router_id, interface_info)
|
||||
return result
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def remove_router_interface(self, context, router_id, interface_info):
|
||||
new_router = super(DFPlugin, self).remove_router_interface(
|
||||
context, router_id, interface_info)
|
||||
with context.session.begin(subtransactions=True):
|
||||
new_router = super(DFPlugin, self).remove_router_interface(
|
||||
context, router_id, interface_info)
|
||||
|
||||
subnet = self.get_subnet(context, new_router['subnet_id'])
|
||||
network_id = subnet['network_id']
|
||||
|
@ -604,9 +635,9 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
network_id,
|
||||
subnet['tenant_id'])
|
||||
except df_exceptions.DBKeyNotFound:
|
||||
LOG.debug("logical router %s is not found in DF DB, suppressing "
|
||||
" delete_lrouter_port exception" % router_id)
|
||||
|
||||
LOG.debug("logical router %s is not found in DF DB, "
|
||||
"suppressing delete_lrouter_port "
|
||||
"exception" % router_id)
|
||||
return new_router
|
||||
|
||||
def _create_dhcp_server_port(self, context, subnet):
|
||||
|
@ -703,37 +734,38 @@ class DFPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
return self._get_ip_from_port(dhcp_port)
|
||||
return None
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def create_floatingip(self, context, floatingip):
|
||||
with context.session.begin(subtransactions=True):
|
||||
floatingip_dict = super(DFPlugin, self).create_floatingip(
|
||||
context, floatingip,
|
||||
initial_status=const.FLOATINGIP_STATUS_DOWN)
|
||||
self.nb_api.create_floatingip(
|
||||
name=floatingip_dict['id'],
|
||||
topic=floatingip_dict['tenant_id'],
|
||||
floating_ip_address=floatingip_dict['floating_ip_address'],
|
||||
floating_network_id=floatingip_dict['floating_network_id'],
|
||||
router_id=floatingip_dict['router_id'],
|
||||
port_id=floatingip_dict['port_id'],
|
||||
fixed_ip_address=floatingip_dict['fixed_ip_address'],
|
||||
status=floatingip_dict['status'])
|
||||
|
||||
self.nb_api.create_floatingip(
|
||||
name=floatingip_dict['id'],
|
||||
topic=floatingip_dict['tenant_id'],
|
||||
floating_ip_address=floatingip_dict['floating_ip_address'],
|
||||
floating_network_id=floatingip_dict['floating_network_id'],
|
||||
router_id=floatingip_dict['router_id'],
|
||||
port_id=floatingip_dict['port_id'],
|
||||
fixed_ip_address=floatingip_dict['fixed_ip_address'],
|
||||
status=floatingip_dict['status'])
|
||||
return floatingip_dict
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def update_floatingip(self, context, id, floatingip):
|
||||
with context.session.begin(subtransactions=True):
|
||||
floatingip_dict = super(DFPlugin, self).update_floatingip(
|
||||
context, id, floatingip)
|
||||
self.nb_api.update_floatingip(
|
||||
name=floatingip_dict['id'],
|
||||
topic=floatingip_dict['tenant_id'],
|
||||
router_id=floatingip_dict['router_id'],
|
||||
port_id=floatingip_dict['port_id'],
|
||||
fixed_ip_address=floatingip_dict['fixed_ip_address'],
|
||||
status=floatingip_dict['status'])
|
||||
|
||||
self.nb_api.update_floatingip(
|
||||
name=floatingip_dict['id'],
|
||||
topic=floatingip_dict['tenant_id'],
|
||||
router_id=floatingip_dict['router_id'],
|
||||
port_id=floatingip_dict['port_id'],
|
||||
fixed_ip_address=floatingip_dict['fixed_ip_address'],
|
||||
status=floatingip_dict['status'])
|
||||
return floatingip_dict
|
||||
|
||||
@lock_db.wrap_db_lock()
|
||||
def delete_floatingip(self, context, id):
|
||||
with context.session.begin(subtransactions=True):
|
||||
floatingip = self.get_floatingip(context, id)
|
||||
|
|
Loading…
Reference in New Issue