Integration of (Distributed) Port Binding OVO

This patch integrates Port Binding OVO in /plugins/ml2/db.py
and /plugins/ml2/plugin.py.

Co-Authored-By: Artur Korzeniewski <artur.korzeniewski@intel.com>
Change-Id: Idb76c0cb2a4d66690c9aca5ba338d5df814cd21e
Partially-Implements: blueprint adopt-oslo-versioned-objects-for-db
This commit is contained in:
Lujin 2016-12-07 07:16:00 +01:00
parent 3f1a9846d2
commit febeaf5d40
10 changed files with 264 additions and 200 deletions

View File

@ -27,6 +27,7 @@ from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import exception as obj_exception
from oslo_versionedobjects import fields as obj_fields
import six
from sqlalchemy import exc as sql_exc
from neutron._i18n import _
from neutron.db import api as db_api
@ -303,7 +304,11 @@ def _detach_db_obj(func):
# TODO(ihrachys) consider refreshing just changed attributes
self.obj_context.session.refresh(self.db_obj)
# detach the model so that consequent fetches don't reuse it
self.obj_context.session.expunge(self.db_obj)
try:
self.obj_context.session.expunge(self.db_obj)
except sql_exc.InvalidRequestError:
# already detached
pass
return res
return decorator
@ -330,6 +335,8 @@ class DeclarativeObject(abc.ABCMeta):
if key in cls.fields or key in cls.obj_extra_fields:
fields_no_update_set.add(key)
cls.fields_no_update = list(fields_no_update_set)
if name in ('PortBinding', 'DistributedPortBinding'):
cls.fields_no_update.remove('host')
model = getattr(cls, 'db_model', None)
if model:
@ -480,7 +487,12 @@ class NeutronDbObject(NeutronObject):
obj = cls(context)
obj.from_db_object(db_obj)
# detach the model so that consequent fetches don't reuse it
context.session.expunge(obj.db_obj)
# TODO(lujinluo): remove the try block when Port OVO is in place.
try:
context.session.expunge(obj.db_obj)
except sql_exc.InvalidRequestError:
# already detached
pass
return obj
def obj_load_attr(self, attrname):

View File

@ -36,6 +36,22 @@ class PortBindingBase(base.NeutronDbObject):
'Port': {'port_id': 'id'},
}
def update(self):
"""Override to handle host update in Port Binding.
Delete old Port Binding entry, update the hostname and create new
Port Binding with all values saved in DB.
This is done due to host being a primary key, and OVO is not able
to update primary key fields.
"""
if self.db_obj and self.host != self.db_obj.host:
with self.obj_context.session.begin(subtransactions=True):
old_obj = self._load_object(self.obj_context, self.db_obj)
old_obj.delete()
self._changed_fields = set(self.fields.keys())
self.create()
else:
super(PortBindingBase, self).update()
@classmethod
def modify_fields_to_db(cls, fields):
result = super(PortBindingBase, cls).modify_fields_to_db(fields)
@ -69,7 +85,7 @@ class PortBinding(PortBindingBase):
fields = {
'port_id': common_types.UUIDField(),
'host': obj_fields.StringField(),
'host': obj_fields.StringField(default=''),
'profile': common_types.DictOfMiscValuesField(),
'vif_type': obj_fields.StringField(),
'vif_details': common_types.DictOfMiscValuesField(nullable=True),

View File

@ -18,8 +18,8 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as n_const
from neutron_lib.objects import exceptions
from neutron_lib.plugins import directory
from oslo_db import exception as db_exc
from oslo_log import log
from oslo_utils import uuidutils
import six
@ -31,6 +31,7 @@ from neutron.db import api as db_api
from neutron.db.models import securitygroup as sg_models
from neutron.db import models_v2
from neutron.objects import ports as port_obj
from neutron.objects import utils as obj_utils
from neutron.plugins.ml2 import models
from neutron.services.segments import exceptions as seg_exc
@ -42,11 +43,10 @@ MAX_PORTS_PER_QUERY = 500
@db_api.context_manager.writer
def add_port_binding(context, port_id):
record = models.PortBinding(
port_id=port_id,
vif_type=portbindings.VIF_TYPE_UNBOUND)
context.session.add(record)
return record
binding = port_obj.PortBinding(
context, port_id=port_id, vif_type=portbindings.VIF_TYPE_UNBOUND)
binding.create()
return binding
@db_api.context_manager.writer
@ -91,35 +91,32 @@ def clear_binding_levels(context, port_id, host):
def ensure_distributed_port_binding(context, port_id, host, router_id=None):
with db_api.context_manager.reader.using(context):
record = (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).first())
if record:
return record
binding_obj = port_obj.DistributedPortBinding.get_object(
context, port_id=port_id, host=host)
if binding_obj:
return binding_obj
try:
with db_api.context_manager.writer.using(context):
record = models.DistributedPortBinding(
port_id=port_id,
host=host,
router_id=router_id,
vif_type=portbindings.VIF_TYPE_UNBOUND,
vnic_type=portbindings.VNIC_NORMAL,
status=n_const.PORT_STATUS_DOWN)
context.session.add(record)
return record
except db_exc.DBDuplicateEntry:
binding_obj = port_obj.DistributedPortBinding(
context,
port_id=port_id,
host=host,
router_id=router_id,
vif_type=portbindings.VIF_TYPE_UNBOUND,
vnic_type=portbindings.VNIC_NORMAL,
status=n_const.PORT_STATUS_DOWN)
binding_obj.create()
return binding_obj
except exceptions.NeutronDbObjectDuplicateEntry:
LOG.debug("Distributed Port %s already bound", port_id)
with db_api.context_manager.reader.using(context):
return (context.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id, host=host).one())
return port_obj.DistributedPortBinding.get_object(
context, port_id=port_id, host=host)
def delete_distributed_port_binding_if_stale(context, binding):
if not binding.router_id and binding.status == n_const.PORT_STATUS_DOWN:
with db_api.context_manager.writer.using(context):
LOG.debug("Distributed port: Deleting binding %s", binding)
context.session.delete(binding)
LOG.debug("Distributed port: Deleting binding %s", binding)
binding.delete()
def get_port(context, port_id):
@ -212,29 +209,27 @@ def make_port_dict_with_security_groups(port, sec_groups):
def get_port_binding_host(context, port_id):
try:
with db_api.context_manager.reader.using(context):
query = (context.session.query(models.PortBinding).
filter(models.PortBinding.port_id.startswith(port_id)).
one())
except exc.NoResultFound:
binding = port_obj.PortBinding.get_objects(
context, port_id=obj_utils.StringStarts(port_id))
if not binding:
LOG.debug("No binding found for port %(port_id)s",
{'port_id': port_id})
return
except exc.MultipleResultsFound:
if len(binding) > 1:
LOG.error("Multiple ports have port_id starting with %s",
port_id)
return
return query.host
return binding[0].host
@db_api.context_manager.reader
def generate_distributed_port_status(context, port_id):
# an OR'ed value of status assigned to parent port from the
# distributedportbinding bucket
query = context.session.query(models.DistributedPortBinding)
final_status = n_const.PORT_STATUS_BUILD
for bind in query.filter(models.DistributedPortBinding.port_id == port_id):
bindings = port_obj.DistributedPortBinding.get_objects(context,
port_id=port_id)
for bind in bindings:
if bind.status == n_const.PORT_STATUS_ACTIVE:
return bind.status
elif bind.status == n_const.PORT_STATUS_DOWN:
@ -243,10 +238,10 @@ def generate_distributed_port_status(context, port_id):
def get_distributed_port_binding_by_host(context, port_id, host):
with db_api.context_manager.reader.using(context):
binding = (context.session.query(models.DistributedPortBinding).
filter(models.DistributedPortBinding.port_id.startswith(port_id),
models.DistributedPortBinding.host == host).first())
bindings = port_obj.DistributedPortBinding.get_objects(
context, port_id=obj_utils.StringStarts(port_id), host=host)
binding = bindings.pop() if bindings else None
if not binding:
LOG.debug("No binding for distributed port %(port_id)s with host "
"%(host)s", {'port_id': port_id, 'host': host})
@ -254,10 +249,8 @@ def get_distributed_port_binding_by_host(context, port_id, host):
def get_distributed_port_bindings(context, port_id):
with db_api.context_manager.reader.using(context):
bindings = (context.session.query(models.DistributedPortBinding).
filter(models.DistributedPortBinding.port_id.startswith(
port_id)).all())
bindings = port_obj.DistributedPortBinding.get_objects(
context, port_id=obj_utils.StringStarts(port_id))
if not bindings:
LOG.debug("No bindings for distributed port %s", port_id)
return bindings

View File

@ -17,7 +17,6 @@ from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from neutron_lib.plugins.ml2 import api
from oslo_log import log
from oslo_serialization import jsonutils
import sqlalchemy
from neutron.db import segments_db
@ -124,9 +123,7 @@ class PortContext(MechanismDriverContext, api.PortContext):
else:
self._network_context = NetworkContext(
plugin, plugin_context, network) if network else None
# NOTE(kevinbenton): InstanceSnapshot can go away once we are working
# with OVO objects instead of native SQLA objects.
self._binding = InstanceSnapshot(binding)
self._binding = binding
self._binding_levels = [InstanceSnapshot(l)
for l in (binding_levels or [])]
self._segments_to_bind = None
@ -295,7 +292,7 @@ class PortContext(MechanismDriverContext, api.PortContext):
# TODO(rkukura) Verify binding allowed, segment in network
self._new_bound_segment = segment_id
self._binding.vif_type = vif_type
self._binding.vif_details = jsonutils.dumps(vif_details)
self._binding.vif_details = vif_details
self._new_port_status = status
def continue_binding(self, segment_id, next_segments_to_bind):

View File

@ -125,6 +125,6 @@ class DistributedPortBinding(model_base.BASEV2):
models_v2.Port,
load_on_pending=True,
backref=orm.backref("distributed_port_binding",
lazy='subquery',
lazy='joined',
cascade='delete'))
revises_on_change = ('port', )

View File

@ -82,6 +82,7 @@ from neutron.db import subnet_service_type_db_models as service_type_db
from neutron.db import vlantransparent_db
from neutron.extensions import providernet as provider
from neutron.extensions import vlantransparent
from neutron.objects import ports as obj_port
from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import db
@ -314,7 +315,6 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
port = mech_context.current
port_id = port['id']
changes = False
host = const.ATTR_NOT_SPECIFIED
if attrs and portbindings.HOST_ID in attrs:
host = attrs.get(portbindings.HOST_ID) or ''
@ -338,8 +338,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if profile not in (None, const.ATTR_NOT_SPECIFIED,
self._get_profile(binding)):
binding.profile = jsonutils.dumps(profile)
if len(binding.profile) > models.BINDING_PROFILE_LEN:
binding.profile = profile
if (len(jsonutils.dumps(binding.profile)) >
models.BINDING_PROFILE_LEN):
msg = _("binding:profile value too large")
raise exc.InvalidInput(error_message=msg)
changes = True
@ -347,7 +348,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# Unbind the port if needed.
if changes:
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
binding.vif_details = ''
binding.vif_details = None
binding.update()
db.clear_binding_levels(plugin_context, port_id, original_host)
mech_context._clear_binding_levels()
port['status'] = const.PORT_STATUS_DOWN
@ -357,13 +359,14 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
if port['device_owner'] == const.DEVICE_OWNER_DVR_INTERFACE:
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
binding.vif_details = ''
binding.vif_details = None
db.clear_binding_levels(plugin_context, port_id, original_host)
mech_context._clear_binding_levels()
binding.host = ''
binding.update()
self._update_port_dict_binding(port, binding)
binding.persist_state_to_session(plugin_context.session)
binding.update()
return changes
@db_api.retry_db_errors
@ -435,12 +438,15 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# transaction.
port = orig_context.current
orig_binding = orig_context._binding
new_binding = models.PortBinding(
profile = orig_binding.profile or {}
new_binding = obj_port.PortBinding(
orig_context._plugin_context,
port_id=orig_binding.port_id,
host=orig_binding.host,
vnic_type=orig_binding.vnic_type,
profile=orig_binding.profile,
profile=profile,
vif_type=portbindings.VIF_TYPE_UNBOUND,
vif_details=''
vif_details=None
)
self._update_port_dict_binding(port, new_binding)
new_context = driver_context.PortContext(
@ -477,7 +483,13 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# mechanism driver update_port_*commit() calls.
try:
port_db = self._get_port(plugin_context, port_id)
cur_binding = port_db.port_binding
plugin_context.session.refresh(port_db)
# TODO(korzen) replace get_objects with port_obj.binding when
# Port OVO is integrated in _get_port
bindings = obj_port.PortBinding.get_objects(
plugin_context, port_id=port_db.id,
status=const.ACTIVE)
cur_binding = bindings.pop() if bindings else None
except exc.PortNotFound:
port_db, cur_binding = None, None
if not port_db or not cur_binding:
@ -544,10 +556,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
cur_binding.host)
db.set_binding_levels(plugin_context,
bind_context._binding_levels)
# refresh context with a snapshot of updated state
cur_context._binding = driver_context.InstanceSnapshot(
cur_binding)
cur_context._binding = cur_binding
cur_context._binding_levels = bind_context._binding_levels
cur_binding.update()
plugin_context.session.refresh(port_db)
# Update PortContext's port dictionary to reflect the
# updated binding state.
@ -598,6 +610,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _get_vif_details(self, binding):
if binding.vif_details:
try:
# TODO(lujinluo): remove isinstance check once we switch to
# objects for all operations.
if isinstance(binding.vif_details, dict):
return binding.vif_details
return jsonutils.loads(binding.vif_details)
except Exception:
LOG.error("Serialized vif_details DB value '%(value)s' "
@ -609,6 +625,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _get_profile(self, binding):
if binding.profile:
try:
# TODO(lujinluo): remove isinstance check once we switch to
# objects for all operations.
if isinstance(binding.profile, dict):
return binding.profile
return jsonutils.loads(binding.profile)
except Exception:
LOG.error("Serialized profile DB value '%(value)s' for "
@ -1292,7 +1312,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
original_port=original_port)
with db_api.context_manager.writer.using(context):
port_db = self._get_port(context, id)
binding = port_db.port_binding
context.session.refresh(port_db)
# TODO(korzen) replace _get_objects with port_obj.binding when
# Port OVO is integrated in _get_port
bindings = obj_port.PortBinding.get_objects(
context, port_id=port_db.id)
binding = bindings.pop() if bindings else None
if not binding:
raise exc.PortNotFound(port_id=id)
mac_address_updated = self._check_mac_update_allowed(
@ -1431,19 +1456,21 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
binding = mech_context._binding
port = mech_context.current
port_id = port['id']
clear_host = None
if binding.vif_type != portbindings.VIF_TYPE_UNBOUND:
binding.vif_details = ''
binding.vif_details = None
binding.vif_type = portbindings.VIF_TYPE_UNBOUND
if binding.host:
db.clear_binding_levels(plugin_context, port_id, binding.host)
binding.host = ''
clear_host = ''
self._update_port_dict_binding(port, binding)
binding.host = attrs and attrs.get(portbindings.HOST_ID)
new_host = attrs and attrs.get(portbindings.HOST_ID) or clear_host
binding.router_id = attrs and attrs.get('device_id')
# merge into session to reflect changes
binding.persist_state_to_session(plugin_context.session)
if new_host:
binding.host = new_host
binding.update()
@utils.transaction_guard
@db_api.retry_if_session_inactive()
@ -1514,7 +1541,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
with db_api.context_manager.writer.using(context):
try:
port_db = self._get_port(context, id)
binding = port_db.port_binding
# TODO(korzen) replace get_objects with port_obj.binding when
# Port OVO is integrated in _get_port
bindings = obj_port.PortBinding.get_objects(
context, port_id=port_db.id)
binding = bindings.pop() if bindings else None
except exc.PortNotFound:
LOG.debug("The port '%s' was deleted", id)
return
@ -1758,6 +1789,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return
if binding.status != status:
binding.status = status
binding.update()
updated = True
if (updated and

View File

@ -62,7 +62,7 @@ object_data = {
'NetworkPortSecurity': '1.0-b30802391a87945ee9c07582b4ff95e3',
'NetworkSegment': '1.0-57b7f2960971e3b95ded20cbc59244a8',
'Port': '1.1-5bf48d12a7bf7f5b7a319e8003b437a5',
'PortBinding': '1.0-3306deeaa6deb01e33af06777d48d578',
'PortBinding': '1.0-0ad9727c4e72d609d5b4f70bcd3bc727',
'PortBindingLevel': '1.0-de66a4c61a083b8f34319fa9dde5b060',
'PortDataPlaneStatus': '1.0-25be74bda46c749653a10357676c0ab2',
'PortDNS': '1.1-c5ca2dc172bdd5fafee3fc986d1d7023',

View File

@ -23,7 +23,6 @@ from neutron_lib import context
from neutron_lib.plugins.ml2 import api
from oslo_utils import uuidutils
from sqlalchemy.orm import exc
from sqlalchemy.orm import query
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
@ -33,7 +32,6 @@ from neutron.db import segments_db
from neutron.objects import network as network_obj
from neutron.objects import ports as port_obj
from neutron.plugins.ml2 import db as ml2_db
from neutron.plugins.ml2 import models
from neutron.tests.unit import testlib_api
@ -64,10 +62,8 @@ class Ml2DBTestCase(testlib_api.SqlTestCase):
return port
def _setup_neutron_portbinding(self, port_id, vif_type, host):
with db_api.context_manager.writer.using(self.ctx):
self.ctx.session.add(models.PortBinding(port_id=port_id,
vif_type=vif_type,
host=host))
port_obj.PortBinding(
self.ctx, port_id=port_id, vif_type=vif_type, host=host).create()
@staticmethod
def _sort_segments(segments):
@ -318,44 +314,45 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
def _setup_distributed_binding(self, network_id,
port_id, router_id, host_id):
with db_api.context_manager.writer.using(self.ctx):
record = models.DistributedPortBinding(
port_id=port_id,
host=host_id,
router_id=router_id,
vif_type=portbindings.VIF_TYPE_UNBOUND,
vnic_type=portbindings.VNIC_NORMAL,
status='DOWN')
self.ctx.session.add(record)
return record
binding_obj = port_obj.DistributedPortBinding(
self.ctx,
port_id=port_id,
host=host_id,
router_id=router_id,
vif_type=portbindings.VIF_TYPE_UNBOUND,
vnic_type=portbindings.VNIC_NORMAL,
status='DOWN')
binding_obj.create()
return binding_obj
def test_ensure_distributed_port_binding_deals_with_db_duplicate(self):
network_id = uuidutils.generate_uuid()
port_id = uuidutils.generate_uuid()
router_id = 'foo_router_id'
host_id = 'foo_host_id'
router_id = uuidutils.generate_uuid()
host_id = uuidutils.generate_uuid()
self._setup_neutron_network(network_id, [port_id])
self._setup_distributed_binding(network_id, port_id,
router_id, host_id)
with mock.patch.object(query.Query, 'first') as query_first:
query_first.return_value = []
with mock.patch.object(ml2_db.LOG, 'debug') as log_trace:
binding = ml2_db.ensure_distributed_port_binding(
self.ctx, port_id, host_id, router_id)
self.assertTrue(query_first.called)
self.assertTrue(log_trace.called)
dpb = self._setup_distributed_binding(network_id, port_id,
router_id, host_id)
with mock.patch.object(port_obj.DistributedPortBinding,
'get_object') as get_object:
get_object.side_effect = [None, dpb]
binding = ml2_db.ensure_distributed_port_binding(
self.ctx, port_id, host_id, router_id)
self.assertTrue(get_object.called)
self.assertEqual(port_id, binding.port_id)
def test_ensure_distributed_port_binding(self):
network_id = uuidutils.generate_uuid()
port_id = uuidutils.generate_uuid()
self._setup_neutron_network(network_id, [port_id])
expected_port_id = uuidutils.generate_uuid()
self._setup_neutron_network(network_id, [expected_port_id])
router = self._setup_neutron_router()
ml2_db.ensure_distributed_port_binding(
self.ctx, port_id, 'foo_host', router.id)
expected = (self.ctx.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id).one())
self.assertEqual(port_id, expected.port_id)
self.ctx, expected_port_id, 'foo_host', router.id)
actual_objs = port_obj.DistributedPortBinding.get_objects(
self.ctx, port_id=expected_port_id)
self.assertEqual(1, len(actual_objs))
actual_obj = actual_objs.pop()
self.assertEqual(expected_port_id, actual_obj.port_id)
def test_ensure_distributed_port_binding_multiple_bindings(self):
network_id = uuidutils.generate_uuid()
@ -366,9 +363,9 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
self.ctx, port_id, 'foo_host_1', router.id)
ml2_db.ensure_distributed_port_binding(
self.ctx, port_id, 'foo_host_2', router.id)
bindings = (self.ctx.session.query(models.DistributedPortBinding).
filter_by(port_id=port_id).all())
self.assertEqual(2, len(bindings))
count_objs = port_obj.DistributedPortBinding.count(
self.ctx, port_id=port_id)
self.assertEqual(2, count_objs)
def test_delete_distributed_port_binding_if_stale(self):
network_id = uuidutils.generate_uuid()
@ -377,21 +374,23 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
binding = self._setup_distributed_binding(
network_id, port_id, None, 'foo_host_id')
ml2_db.delete_distributed_port_binding_if_stale(self.ctx,
binding)
count = (self.ctx.session.query(models.DistributedPortBinding).
filter_by(port_id=binding.port_id).count())
self.assertFalse(count)
ml2_db.delete_distributed_port_binding_if_stale(self.ctx, binding)
obj_exists = port_obj.DistributedPortBinding.objects_exist(
self.ctx, port_id=binding.port_id)
self.assertFalse(obj_exists)
def test_get_distributed_port_binding_by_host_not_found(self):
port_id = uuidutils.generate_uuid()
host_id = uuidutils.generate_uuid()
port = ml2_db.get_distributed_port_binding_by_host(
self.ctx, 'foo_port_id', 'foo_host_id')
self.ctx, port_id, host_id)
self.assertIsNone(port)
def test_get_distributed_port_bindings_not_found(self):
port = ml2_db.get_distributed_port_bindings(self.ctx,
'foo_port_id')
self.assertFalse(len(port))
uuidutils.generate_uuid())
self.assertEqual(0, len(port))
def test_get_distributed_port_bindings(self):
network_id = uuidutils.generate_uuid()
@ -412,8 +411,9 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
network_obj.Network(self.ctx, id=network_id).create()
with db_api.context_manager.writer.using(self.ctx):
device_owner = constants.DEVICE_OWNER_DVR_INTERFACE
port_id = uuidutils.generate_uuid()
port = models_v2.Port(
id='port_id',
id=port_id,
network_id=network_id,
mac_address='00:11:22:33:44:55',
admin_state_up=True,
@ -421,23 +421,20 @@ class Ml2DvrDBTestCase(testlib_api.SqlTestCase):
device_id='device_id',
device_owner=device_owner)
self.ctx.session.add(port)
binding_kwarg = {
'port_id': 'port_id',
'host': 'host',
'vif_type': portbindings.VIF_TYPE_UNBOUND,
'vnic_type': portbindings.VNIC_NORMAL,
'router_id': 'router_id',
'status': constants.PORT_STATUS_DOWN
}
self.ctx.session.add(models.DistributedPortBinding(
**binding_kwarg))
binding_kwarg['host'] = 'another-host'
self.ctx.session.add(models.DistributedPortBinding(
**binding_kwarg))
binding_kwarg = {
'port_id': port_id,
'host': 'host',
'vif_type': portbindings.VIF_TYPE_UNBOUND,
'vnic_type': portbindings.VNIC_NORMAL,
'router_id': 'router_id',
'status': constants.PORT_STATUS_DOWN
}
port_obj.DistributedPortBinding(self.ctx, **binding_kwarg).create()
binding_kwarg['host'] = 'another-host'
port_obj.DistributedPortBinding(self.ctx, **binding_kwarg).create()
with warnings.catch_warnings(record=True) as warning_list:
with db_api.context_manager.writer.using(self.ctx):
self.ctx.session.delete(port)
self.assertEqual([], warning_list)
ports = ml2_db.get_distributed_port_bindings(self.ctx,
'port_id')
self.assertEqual(0, len(ports))
bindings = ml2_db.get_distributed_port_bindings(self.ctx, port_id)
self.assertEqual(0, len(bindings))

View File

@ -46,6 +46,7 @@ from neutron.db import provisioning_blocks
from neutron.db import segments_db
from neutron.extensions import multiprovidernet as mpnet
from neutron.objects import base as base_obj
from neutron.objects import ports as obj_port
from neutron.objects import router as l3_obj
from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import db as ml2_db
@ -1629,9 +1630,10 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
# create a port and delete it so we have an expired mechanism context
with self.port() as port:
plugin = directory.get_plugin()
binding = plugin._get_port(self.context,
port['port']['id']).port_binding
binding['host'] = 'test'
binding = obj_port.PortBinding.get_object(
self.context, port_id=port['port']['id'], host='')
binding.host = 'test'
binding.update()
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],
plugin.get_network(self.context, port['port']['network_id']),
@ -1650,10 +1652,11 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
def _create_port_and_bound_context(self, port_vif_type, bound_vif_type):
with self.port() as port:
plugin = directory.get_plugin()
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding['host'] = 'fake_host'
binding = obj_port.PortBinding.get_object(
self.context, port_id=port['port']['id'], host='')
binding.host = 'fake_host'
binding['vif_type'] = port_vif_type
binding.update()
# Generates port context to be used before the bind.
port_context = driver_context.PortContext(
plugin, self.context, port['port'],
@ -1765,10 +1768,10 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
def test_update_port_binding_host_id_none(self):
with self.port() as port:
plugin = directory.get_plugin()
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
with self.context.session.begin(subtransactions=True):
binding.host = 'test'
binding = obj_port.PortBinding.get_object(
self.context, port_id=port['port']['id'], host='')
binding.host = 'test'
binding.update()
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],
plugin.get_network(self.context, port['port']['network_id']),
@ -1779,15 +1782,18 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
self.assertEqual('test', binding.host)
with self.context.session.begin(subtransactions=True):
plugin._process_port_binding(mech_context, attrs)
updated_binding = obj_port.PortBinding.get_objects(self.context,
port_id=port['port']['id']).pop()
self.assertTrue(update_mock.mock_calls)
self.assertEqual('', binding.host)
self.assertEqual('', updated_binding.host)
def test_update_port_binding_host_id_not_changed(self):
with self.port() as port:
plugin = directory.get_plugin()
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding['host'] = 'test'
binding = obj_port.PortBinding.get_object(
self.context, port_id=port['port']['id'], host='')
binding.host = 'test'
binding.update()
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],
plugin.get_network(self.context, port['port']['network_id']),
@ -1800,30 +1806,34 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
self.assertEqual('test', binding.host)
def test_process_distributed_port_binding_update_router_id(self):
host_id = 'host'
binding = models.DistributedPortBinding(
port_id='port_id',
host=host_id,
router_id='old_router_id',
vif_type=portbindings.VIF_TYPE_OVS,
vnic_type=portbindings.VNIC_NORMAL,
status=constants.PORT_STATUS_DOWN)
plugin = directory.get_plugin()
mock_network = {'id': 'net_id'}
mock_port = {'id': 'port_id'}
ctxt = context.get_admin_context()
new_router_id = 'new_router'
attrs = {'device_id': new_router_id, portbindings.HOST_ID: host_id}
with mock.patch.object(plugin, '_update_port_dict_binding'):
with mock.patch.object(segments_db, 'get_network_segments',
return_value=[]):
mech_context = driver_context.PortContext(
self, ctxt, mock_port, mock_network, binding, None)
plugin._process_distributed_port_binding(mech_context,
ctxt, attrs)
self.assertEqual(new_router_id,
mech_context._binding.router_id)
self.assertEqual(host_id, mech_context._binding.host)
with self.port() as port:
host_id = 'host'
ctxt = context.get_admin_context()
binding_obj = obj_port.DistributedPortBinding(
ctxt,
port_id=port['port']['id'],
host=host_id,
profile={},
router_id='old_router_id',
vif_type=portbindings.VIF_TYPE_OVS,
vnic_type=portbindings.VNIC_NORMAL,
status=constants.PORT_STATUS_DOWN)
binding_obj.create()
plugin = directory.get_plugin()
mock_network = {'id': 'net_id'}
mock_port = {'id': 'port_id'}
new_router_id = 'new_router'
attrs = {'device_id': new_router_id, portbindings.HOST_ID: host_id}
with mock.patch.object(plugin, '_update_port_dict_binding'):
with mock.patch.object(segments_db, 'get_network_segments',
return_value=[]):
mech_context = driver_context.PortContext(
self, ctxt, mock_port, mock_network, binding_obj, None)
plugin._process_distributed_port_binding(mech_context,
ctxt, attrs)
self.assertEqual(new_router_id,
mech_context._binding.router_id)
self.assertEqual(host_id, mech_context._binding.host)
def test_update_distributed_port_binding_on_concurrent_port_delete(self):
plugin = directory.get_plugin()
@ -1854,9 +1864,20 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
def test__bind_port_original_port_set(self):
plugin = directory.get_plugin()
plugin.mechanism_manager = mock.Mock()
mock_port = {'id': 'port_id'}
mock_port = {'id': uuidutils.generate_uuid()}
context = mock.Mock()
binding_obj = obj_port.DistributedPortBinding(
mock.MagicMock(),
port_id=mock_port['id'],
host='vm_host',
profile={},
router_id='old_router_id',
vif_type='',
vnic_type=portbindings.VNIC_NORMAL,
status=constants.PORT_STATUS_DOWN)
binding_obj.create()
context.network.current = {'id': 'net_id'}
context._binding = binding_obj
context.original = mock_port
with mock.patch.object(plugin, '_update_port_dict_binding'), \
mock.patch.object(segments_db, 'get_network_segments',
@ -2532,13 +2553,15 @@ class TestFaultyMechansimDriver(Ml2PluginV2FaultyDriverTestCase):
def test_update_distributed_router_interface_port(self):
"""Test validate distributed router interface update succeeds."""
host_id = 'host'
binding = models.DistributedPortBinding(
port_id='port_id',
host=host_id,
router_id='old_router_id',
vif_type=portbindings.VIF_TYPE_OVS,
vnic_type=portbindings.VNIC_NORMAL,
status=constants.PORT_STATUS_DOWN)
binding_obj = obj_port.DistributedPortBinding(
mock.MagicMock(),
port_id=uuidutils.generate_uuid(),
host=host_id,
router_id='old_router_id',
vif_type=portbindings.VIF_TYPE_OVS,
vnic_type=portbindings.VNIC_NORMAL,
status=constants.PORT_STATUS_DOWN)
binding_obj.create()
with mock.patch.object(
mech_test.TestMechanismDriver,
'update_port_postcommit',
@ -2548,7 +2571,7 @@ class TestFaultyMechansimDriver(Ml2PluginV2FaultyDriverTestCase):
'update_port_precommit') as port_pre,\
mock.patch.object(
ml2_db, 'get_distributed_port_bindings') as dist_bindings:
dist_bindings.return_value = [binding]
dist_bindings.return_value = [binding_obj]
port_pre.return_value = True
with self.network() as network:
with self.subnet(network=network) as subnet:
@ -2771,9 +2794,10 @@ class TestML2Segments(Ml2PluginV2TestCase):
# add writer here to make sure that the following operations are
# performed in the same session
with db_api.context_manager.writer.using(self.context):
binding = plugin._get_port(
self.context, port['port']['id']).port_binding
binding['host'] = 'host-ovs-no_filter'
binding = obj_port.PortBinding.get_object(
self.context, port_id=port['port']['id'], host='')
binding.host = 'host-ovs-no_filter'
binding.update()
mech_context = driver_context.PortContext(
plugin, self.context, port['port'],
plugin.get_network(self.context,

View File

@ -19,11 +19,10 @@ from neutron_lib import constants as const
from neutron_lib import context
from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_serialization import jsonutils
from neutron.conf.plugins.ml2.drivers import driver_type
from neutron.objects import ports as obj_port
from neutron.plugins.ml2 import driver_context
from neutron.plugins.ml2 import models as ml2_models
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
@ -111,10 +110,8 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
ctx = context.get_admin_context()
with self.port(name='name') as port:
# emulating concurrent binding deletion
with ctx.session.begin():
for item in (ctx.session.query(ml2_models.PortBinding).
filter_by(port_id=port['port']['id'])):
ctx.session.delete(item)
obj_port.PortBinding.delete_objects(
ctx, port_id=port['port']['id'])
self.assertIsNone(
self.plugin.get_bound_port_context(ctx, port['port']['id']))
@ -191,13 +188,9 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
attrs['binding:host_id'] = 'host2'
updated_port = attrs.copy()
network = {'id': attrs['network_id']}
binding = ml2_models.PortBinding(
port_id=original_port['id'],
host=original_port['binding:host_id'],
vnic_type=original_port['binding:vnic_type'],
profile=jsonutils.dumps(original_port['binding:profile']),
vif_type=original_port['binding:vif_type'],
vif_details=original_port['binding:vif_details'])
binding = obj_port.PortBinding.get_object(
ctx, port_id=original_port['id'],
host=original_port['binding:host_id'])
levels = []
mech_context = driver_context.PortContext(
plugin, ctx, updated_port, network, binding, levels,