Merge "Lazy load of resources in resource cache"

This commit is contained in:
Jenkins 2017-06-16 00:15:54 +00:00 committed by Gerrit Code Review
commit 934641e59e
3 changed files with 40 additions and 27 deletions

View File

@ -36,6 +36,9 @@ class RemoteResourceCache(object):
self.resource_types = resource_types
self._cache_by_type_and_id = {rt: {} for rt in self.resource_types}
self._deleted_ids_by_type = {rt: set() for rt in self.resource_types}
# track everything we've asked the server so we don't ask again
self._satisfied_server_queries = set()
self._puller = resources_rpc.ResourcesPullRpcApi()
def _type_cache(self, rtype):
if rtype not in self.resource_types:
@ -45,22 +48,37 @@ class RemoteResourceCache(object):
def start_watcher(self):
self._watcher = RemoteResourceWatcher(self)
def bulk_flood_cache(self):
# TODO(kevinbenton): this fills the cache with *every* server record.
# make this more intelligent to only pull ports on host, then all
# networks and security groups for those ports, etc.
context = n_ctx.get_admin_context()
puller = resources_rpc.ResourcesPullRpcApi()
for rtype in self.resource_types:
for resource in puller.bulk_pull(context, rtype):
self._type_cache(rtype)[resource.id] = resource
def get_resource_by_id(self, rtype, obj_id):
"""Returns None if it doesn't exist."""
if obj_id in self._deleted_ids_by_type[rtype]:
return None
cached_item = self._type_cache(rtype).get(obj_id)
if cached_item:
return cached_item
# try server in case object existed before agent start
self._flood_cache_for_query(rtype, id=obj_id)
return self._type_cache(rtype).get(obj_id)
def _flood_cache_for_query(self, rtype, **filter_kwargs):
"""Load info from server for first query.
Queries the server if this is the first time a given query for
rtype has been issued.
"""
query_id = (rtype, ) + tuple(sorted(filter_kwargs.items()))
if query_id in self._satisfied_server_queries:
# we've already asked the server this question so we don't
# ask again because any updates will have been pushed to us
return
context = n_ctx.get_admin_context()
for resource in self._puller.bulk_pull(context, rtype,
filter_kwargs=filter_kwargs):
self._type_cache(rtype)[resource.id] = resource
self._satisfied_server_queries.add(query_id)
def get_resources(self, rtype, filters):
"""Find resources that match key:value in filters dict."""
self._flood_cache_for_query(rtype, **filters)
match = lambda o: all(v == getattr(o, k) for k, v in filters.items())
return self.match_resources_with_func(rtype, match)

View File

@ -169,9 +169,6 @@ def create_cache_for_l2_agent():
]
rcache = resource_cache.RemoteResourceCache(resource_types)
rcache.start_watcher()
# TODO(kevinbenton): ensure flood uses filters or that this has a long
# timeout before Pike release.
rcache.bulk_flood_cache()
return rcache

View File

@ -46,20 +46,7 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
self.duck = OVOLikeThing(2)
self.ctx = context.get_admin_context()
self.rcache = resource_cache.RemoteResourceCache(rtypes)
def test_bulk_flood_cache(self):
with mock.patch('neutron.api.rpc.handlers.resources_rpc.'
'ResourcesPullRpcApi') as rpc_pull_mock:
pull = rpc_pull_mock.return_value.bulk_pull
pull.side_effect = [[self.duck], [self.goose]]
self.rcache.bulk_flood_cache()
pull.assert_any_call(mock.ANY, 'duck')
pull.assert_any_call(mock.ANY, 'goose')
self.assertEqual(self.goose,
self.rcache.get_resource_by_id('goose', 1))
self.assertEqual(self.duck,
self.rcache.get_resource_by_id('duck', 2))
self.assertIsNone(self.rcache.get_resource_by_id('duck', 1))
self._pullmock = mock.patch.object(self.rcache, '_puller').start()
def test_get_resource_by_id(self):
self.rcache.record_resource_update(self.ctx, 'goose', self.goose)
@ -67,6 +54,17 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
self.rcache.get_resource_by_id('goose', 1))
self.assertIsNone(self.rcache.get_resource_by_id('goose', 2))
def test__flood_cache_for_query_pulls_once(self):
self.rcache._flood_cache_for_query('goose', id=66)
self._pullmock.bulk_pull.assert_called_once_with(
mock.ANY, 'goose', filter_kwargs={'id': 66})
self._pullmock.bulk_pull.reset_mock()
self.rcache._flood_cache_for_query('goose', id=66)
self.assertFalse(self._pullmock.called)
self.rcache._flood_cache_for_query('goose', id=67)
self._pullmock.bulk_pull.assert_called_once_with(
mock.ANY, 'goose', filter_kwargs={'id': 67})
def test_get_resources(self):
geese = [OVOLikeThing(3, size='large'), OVOLikeThing(5, size='medium'),
OVOLikeThing(4, size='large'), OVOLikeThing(6, size='small')]