diff --git a/neutron/agent/resource_cache.py b/neutron/agent/resource_cache.py index c83f0b77f8e..e031c01cb0a 100644 --- a/neutron/agent/resource_cache.py +++ b/neutron/agent/resource_cache.py @@ -36,6 +36,9 @@ class RemoteResourceCache(object): self.resource_types = resource_types self._cache_by_type_and_id = {rt: {} for rt in self.resource_types} self._deleted_ids_by_type = {rt: set() for rt in self.resource_types} + # track everything we've asked the server so we don't ask again + self._satisfied_server_queries = set() + self._puller = resources_rpc.ResourcesPullRpcApi() def _type_cache(self, rtype): if rtype not in self.resource_types: @@ -45,22 +48,37 @@ class RemoteResourceCache(object): def start_watcher(self): self._watcher = RemoteResourceWatcher(self) - def bulk_flood_cache(self): - # TODO(kevinbenton): this fills the cache with *every* server record. - # make this more intelligent to only pull ports on host, then all - # networks and security groups for those ports, etc. - context = n_ctx.get_admin_context() - puller = resources_rpc.ResourcesPullRpcApi() - for rtype in self.resource_types: - for resource in puller.bulk_pull(context, rtype): - self._type_cache(rtype)[resource.id] = resource - def get_resource_by_id(self, rtype, obj_id): """Returns None if it doesn't exist.""" + if obj_id in self._deleted_ids_by_type[rtype]: + return None + cached_item = self._type_cache(rtype).get(obj_id) + if cached_item: + return cached_item + # try server in case object existed before agent start + self._flood_cache_for_query(rtype, id=obj_id) return self._type_cache(rtype).get(obj_id) + def _flood_cache_for_query(self, rtype, **filter_kwargs): + """Load info from server for first query. + + Queries the server if this is the first time a given query for + rtype has been issued. + """ + query_id = (rtype, ) + tuple(sorted(filter_kwargs.items())) + if query_id in self._satisfied_server_queries: + # we've already asked the server this question so we don't + # ask again because any updates will have been pushed to us + return + context = n_ctx.get_admin_context() + for resource in self._puller.bulk_pull(context, rtype, + filter_kwargs=filter_kwargs): + self._type_cache(rtype)[resource.id] = resource + self._satisfied_server_queries.add(query_id) + def get_resources(self, rtype, filters): """Find resources that match key:value in filters dict.""" + self._flood_cache_for_query(rtype, **filters) match = lambda o: all(v == getattr(o, k) for k, v in filters.items()) return self.match_resources_with_func(rtype, match) diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index d0810885339..a9e056ec8ce 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -169,9 +169,6 @@ def create_cache_for_l2_agent(): ] rcache = resource_cache.RemoteResourceCache(resource_types) rcache.start_watcher() - # TODO(kevinbenton): ensure flood uses filters or that this has a long - # timeout before Pike release. - rcache.bulk_flood_cache() return rcache diff --git a/neutron/tests/unit/agent/test_resource_cache.py b/neutron/tests/unit/agent/test_resource_cache.py index 5fa867952d0..63b4d3a22e6 100644 --- a/neutron/tests/unit/agent/test_resource_cache.py +++ b/neutron/tests/unit/agent/test_resource_cache.py @@ -46,20 +46,7 @@ class RemoteResourceCacheTestCase(base.BaseTestCase): self.duck = OVOLikeThing(2) self.ctx = context.get_admin_context() self.rcache = resource_cache.RemoteResourceCache(rtypes) - - def test_bulk_flood_cache(self): - with mock.patch('neutron.api.rpc.handlers.resources_rpc.' - 'ResourcesPullRpcApi') as rpc_pull_mock: - pull = rpc_pull_mock.return_value.bulk_pull - pull.side_effect = [[self.duck], [self.goose]] - self.rcache.bulk_flood_cache() - pull.assert_any_call(mock.ANY, 'duck') - pull.assert_any_call(mock.ANY, 'goose') - self.assertEqual(self.goose, - self.rcache.get_resource_by_id('goose', 1)) - self.assertEqual(self.duck, - self.rcache.get_resource_by_id('duck', 2)) - self.assertIsNone(self.rcache.get_resource_by_id('duck', 1)) + self._pullmock = mock.patch.object(self.rcache, '_puller').start() def test_get_resource_by_id(self): self.rcache.record_resource_update(self.ctx, 'goose', self.goose) @@ -67,6 +54,17 @@ class RemoteResourceCacheTestCase(base.BaseTestCase): self.rcache.get_resource_by_id('goose', 1)) self.assertIsNone(self.rcache.get_resource_by_id('goose', 2)) + def test__flood_cache_for_query_pulls_once(self): + self.rcache._flood_cache_for_query('goose', id=66) + self._pullmock.bulk_pull.assert_called_once_with( + mock.ANY, 'goose', filter_kwargs={'id': 66}) + self._pullmock.bulk_pull.reset_mock() + self.rcache._flood_cache_for_query('goose', id=66) + self.assertFalse(self._pullmock.called) + self.rcache._flood_cache_for_query('goose', id=67) + self._pullmock.bulk_pull.assert_called_once_with( + mock.ANY, 'goose', filter_kwargs={'id': 67}) + def test_get_resources(self): geese = [OVOLikeThing(3, size='large'), OVOLikeThing(5, size='medium'), OVOLikeThing(4, size='large'), OVOLikeThing(6, size='small')]