A model instance update also sends an update on all referred instances
When an instance is updated/created in the DF local controller, the DF local controller now iterates all references within the instance, and sends an update/create event for all instances referenced by that instance. For instance, logical port P references security group S. Suppose the local controller did not receive the event that S was created (yet). Once the event on P is received, that event cannot be processed until the event that S is created is received. This change adds that queuing behaviour. Related-Bug: #1690775 Related-Bug: #1708178 Change-Id: Ic2ee535c0898b37c200719381f61c954b9ff7ddf
This commit is contained in:
parent
3076ceb35c
commit
ba93ad9a2e
|
@ -30,6 +30,7 @@ class DragonflowException(Exception):
|
|||
try:
|
||||
super(DragonflowException, self).__init__(self.message % kwargs)
|
||||
self.msg = self.message % kwargs
|
||||
self.kwargs = kwargs
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception() as ctxt:
|
||||
if not self.use_fatal_exceptions():
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import itertools
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
@ -23,6 +25,7 @@ from ryu.app.ofctl import service as of_service
|
|||
from ryu.base import app_manager
|
||||
from ryu import cfg as ryu_cfg
|
||||
|
||||
from dragonflow.common import exceptions
|
||||
from dragonflow.common import utils as df_utils
|
||||
from dragonflow import conf as cfg
|
||||
from dragonflow.controller.common import constants as ctrl_const
|
||||
|
@ -34,6 +37,7 @@ from dragonflow.db import api_nb
|
|||
from dragonflow.db import db_common
|
||||
from dragonflow.db import db_store
|
||||
from dragonflow.db import model_framework
|
||||
from dragonflow.db import model_proxy
|
||||
from dragonflow.db.models import core
|
||||
from dragonflow.db.models import l2
|
||||
from dragonflow.db.models import mixins
|
||||
|
@ -50,6 +54,11 @@ class DfLocalController(object):
|
|||
def __init__(self, chassis_name, nb_api):
|
||||
self.db_store = db_store.get_instance()
|
||||
self._queue = queue.PriorityQueue()
|
||||
# pending_id -> (model, pender_id)
|
||||
# 'pending_id' is the ID of the object for which we are waiting.
|
||||
# 'model' and 'pender_id' are the model and the ID of the object
|
||||
# which is waiting for the object described by 'pending_id'
|
||||
self._pending_objects = collections.defaultdict(set)
|
||||
|
||||
self.chassis_name = chassis_name
|
||||
self.nb_api = nb_api
|
||||
|
@ -341,10 +350,110 @@ class DfLocalController(object):
|
|||
self.delete_by_id(model_class, update.key)
|
||||
else:
|
||||
obj = model_class.from_json(update.value)
|
||||
self.update(obj)
|
||||
self._send_updates_for_object(obj)
|
||||
else:
|
||||
LOG.warning('Unfamiliar update: %s', str(update))
|
||||
|
||||
def _get_model(self, obj):
|
||||
if model_proxy.is_model_proxy(obj):
|
||||
return obj.get_proxied_model()
|
||||
return type(obj)
|
||||
|
||||
def _send_updates_for_object(self, obj):
|
||||
try:
|
||||
references = self.get_model_references_deep(obj)
|
||||
except exceptions.ReferencedObjectNotFound as e:
|
||||
proxy = e.kwargs['proxy']
|
||||
reference_id = proxy.id
|
||||
model = self._get_model(obj)
|
||||
self._pending_objects[reference_id].add((model, obj.id))
|
||||
else:
|
||||
queue = itertools.chain(reversed(references), (obj,))
|
||||
self._send_update_events(queue)
|
||||
self._send_pending_events(obj)
|
||||
|
||||
def _send_pending_events(self, obj):
|
||||
try:
|
||||
pending = self._pending_objects.pop(obj.id)
|
||||
except KeyError:
|
||||
return # Nothing to do
|
||||
for model, item_id in pending:
|
||||
lean_obj = model(id=item_id)
|
||||
item = self.nb_api.get(lean_obj)
|
||||
self._send_updates_for_object(item)
|
||||
|
||||
def get_model_references_deep(self, obj):
|
||||
"""
|
||||
Return a tuple of all model instances referenced by the given model
|
||||
instance, including indirect references. e.g. if an lport references
|
||||
a network, and that network references a QoS policy, then both are
|
||||
returned in the iterator.
|
||||
|
||||
Raises a ReferencedObjectNotFound exception if a referenced model
|
||||
cannot be found in the db_store or NB DB.
|
||||
|
||||
:param obj: Model instance
|
||||
:type obj: model_framework.ModelBase
|
||||
:return: iterator
|
||||
:raises: exceptions.ReferencedObjectNotFound
|
||||
"""
|
||||
return tuple(self.iter_model_references_deep(obj))
|
||||
|
||||
def iter_model_references_deep(self, obj):
|
||||
"""
|
||||
Return an iterator on all model instances referenced by the given model
|
||||
instance, including indirect references. e.g. if an lport references
|
||||
a network, and that network references a QoS policy, then both are
|
||||
returned in the iterator.
|
||||
|
||||
Raises a ReferencedObjectNotFound exception upon a referenced model
|
||||
cannot be found in the db_store or NB DB. The exception is thrown
|
||||
when that object is reached by the iterator, allowing other objects
|
||||
to be processed. If you need to verify all objects can be referenced,
|
||||
use 'get_model_references_deep'.
|
||||
|
||||
:param obj: Model instance
|
||||
:type obj: model_framework.ModelBase
|
||||
:return: iterator
|
||||
:raises: exceptions.ReferencedObjectNotFound
|
||||
"""
|
||||
seen = set()
|
||||
queue = collections.deque((obj,))
|
||||
while queue:
|
||||
item = queue.pop()
|
||||
if item.id in seen:
|
||||
continue
|
||||
seen.add(item.id)
|
||||
if model_proxy.is_model_proxy(item):
|
||||
# NOTE(oanson) _dereference raises an exception for unknown
|
||||
# objectes, i.e. objects not found in the NB DB.
|
||||
item = self._dereference(item)
|
||||
yield item
|
||||
for submodel in item.iter_submodels():
|
||||
queue.append(submodel)
|
||||
|
||||
def _dereference(self, reference):
|
||||
"""
|
||||
Dereference a model proxy object. Return first from the db_store, and
|
||||
if it is not there, from the NB DB. Raise a ReferencedObjectNotFound
|
||||
exception if it is not in the NB DB either.
|
||||
|
||||
:param reference: Model instance
|
||||
:type reference: model_framework.ModelBase
|
||||
:return: iterator
|
||||
:raises: exceptions.ReferencedObjectNotFound
|
||||
"""
|
||||
item = reference.get_object()
|
||||
if item is None:
|
||||
item = self.nb_api.get(reference)
|
||||
if item is None:
|
||||
raise exceptions.ReferencedObjectNotFound(proxy=reference)
|
||||
return item
|
||||
|
||||
def _send_update_events(self, iterable):
|
||||
for instance in iterable:
|
||||
self.update(instance)
|
||||
|
||||
|
||||
def _has_basic_events(obj):
|
||||
return isinstance(obj, mixins.BasicEvents)
|
||||
|
|
|
@ -42,6 +42,11 @@ def _normalize_tuple(v):
|
|||
return tuple(v)
|
||||
|
||||
|
||||
def is_submodel(instance):
|
||||
return (isinstance(instance, ModelBase) or
|
||||
hasattr(instance, 'get_object'))
|
||||
|
||||
|
||||
class _CommonBase(models.Base):
|
||||
'''Base class for extending jsonmodels' Base
|
||||
|
||||
|
@ -269,6 +274,16 @@ class _CommonBase(models.Base):
|
|||
if isinstance(subobj, ModelBase):
|
||||
yield subobj
|
||||
|
||||
def iter_submodels(self):
|
||||
for name, field in self.iterate_over_set_fields():
|
||||
member = getattr(self, name)
|
||||
if isinstance(member, list):
|
||||
for instance in member:
|
||||
if is_submodel(instance):
|
||||
yield instance
|
||||
elif is_submodel(member):
|
||||
yield member
|
||||
|
||||
@classmethod
|
||||
def get_index(cls, index):
|
||||
return cls.get_indexes()[index]
|
||||
|
|
|
@ -16,6 +16,7 @@ from oslo_config import cfg
|
|||
from dragonflow.controller import df_local_controller
|
||||
from dragonflow.controller import ryu_base_app
|
||||
from dragonflow.db import db_store
|
||||
from dragonflow.db import field_types as df_fields
|
||||
from dragonflow.db import model_framework
|
||||
from dragonflow.db.models import core
|
||||
from dragonflow.db.models import mixins
|
||||
|
@ -164,3 +165,42 @@ class DfLocalControllerTestCase(test_app_base.DFAppTestBase):
|
|||
get_one.return_value = None
|
||||
self.controller.delete_model_object(None)
|
||||
delete.assert_not_called()
|
||||
|
||||
def test_iter_references_deep(self):
|
||||
|
||||
@model_framework.register_model
|
||||
@model_framework.construct_nb_db_model
|
||||
class LocalReffedModel(model_framework.ModelBase):
|
||||
table_name = 'LocalReffedModel'
|
||||
pass
|
||||
|
||||
@model_framework.register_model
|
||||
@model_framework.construct_nb_db_model
|
||||
class LocalReffingModel(model_framework.ModelBase):
|
||||
table_name = 'LocalReffingModel'
|
||||
ref1 = df_fields.ReferenceField(LocalReffedModel)
|
||||
|
||||
@model_framework.register_model
|
||||
@model_framework.construct_nb_db_model
|
||||
class LocalListReffingModel(model_framework.ModelBase):
|
||||
table_name = 'LocalListReffingModel'
|
||||
ref2 = df_fields.ReferenceListField(LocalReffingModel)
|
||||
|
||||
models = {}
|
||||
nb_api_mocker = mock.patch.object(self.controller, 'nb_api')
|
||||
nb_api = nb_api_mocker.start()
|
||||
self.addCleanup(nb_api_mocker.stop)
|
||||
nb_api.get = lambda m: models[m.id]
|
||||
models['3'] = LocalReffedModel(id='3')
|
||||
ref1 = LocalReffingModel(id='2', ref1='3')
|
||||
models['2'] = ref1
|
||||
models['5'] = LocalReffedModel(id='5')
|
||||
ref2 = LocalReffingModel(id='4', ref1='5')
|
||||
models['4'] = ref2
|
||||
model = LocalListReffingModel(id='1', ref2=[ref1, ref2])
|
||||
models['1'] = model
|
||||
references = [inst.id for inst in
|
||||
self.controller.iter_model_references_deep(model)]
|
||||
self.assertItemsEqual(['2', '3', '4', '5'], references)
|
||||
self.assertLess(references.index('2'), references.index('3'))
|
||||
self.assertLess(references.index('4'), references.index('5'))
|
||||
|
|
|
@ -487,3 +487,8 @@ class TestModelFramework(tests_base.BaseTestCase):
|
|||
sorted_models.index(ReffingModel3)
|
||||
)
|
||||
self.assertIn(ReffedModel, ReffingModel3.dependencies())
|
||||
|
||||
def test_iter_submodels(self):
|
||||
model = EmbeddingModel(id='1', embedded=ModelTest(id='2'))
|
||||
submodels = [inst.id for inst in model.iter_submodels()]
|
||||
self.assertEqual(['2'], submodels)
|
||||
|
|
Loading…
Reference in New Issue