Merge "Lazy load of resources in resource cache"
This commit is contained in:
commit
934641e59e
|
@ -36,6 +36,9 @@ class RemoteResourceCache(object):
|
|||
self.resource_types = resource_types
|
||||
self._cache_by_type_and_id = {rt: {} for rt in self.resource_types}
|
||||
self._deleted_ids_by_type = {rt: set() for rt in self.resource_types}
|
||||
# track everything we've asked the server so we don't ask again
|
||||
self._satisfied_server_queries = set()
|
||||
self._puller = resources_rpc.ResourcesPullRpcApi()
|
||||
|
||||
def _type_cache(self, rtype):
|
||||
if rtype not in self.resource_types:
|
||||
|
@ -45,22 +48,37 @@ class RemoteResourceCache(object):
|
|||
def start_watcher(self):
|
||||
self._watcher = RemoteResourceWatcher(self)
|
||||
|
||||
def bulk_flood_cache(self):
|
||||
# TODO(kevinbenton): this fills the cache with *every* server record.
|
||||
# make this more intelligent to only pull ports on host, then all
|
||||
# networks and security groups for those ports, etc.
|
||||
context = n_ctx.get_admin_context()
|
||||
puller = resources_rpc.ResourcesPullRpcApi()
|
||||
for rtype in self.resource_types:
|
||||
for resource in puller.bulk_pull(context, rtype):
|
||||
self._type_cache(rtype)[resource.id] = resource
|
||||
|
||||
def get_resource_by_id(self, rtype, obj_id):
|
||||
"""Returns None if it doesn't exist."""
|
||||
if obj_id in self._deleted_ids_by_type[rtype]:
|
||||
return None
|
||||
cached_item = self._type_cache(rtype).get(obj_id)
|
||||
if cached_item:
|
||||
return cached_item
|
||||
# try server in case object existed before agent start
|
||||
self._flood_cache_for_query(rtype, id=obj_id)
|
||||
return self._type_cache(rtype).get(obj_id)
|
||||
|
||||
def _flood_cache_for_query(self, rtype, **filter_kwargs):
|
||||
"""Load info from server for first query.
|
||||
|
||||
Queries the server if this is the first time a given query for
|
||||
rtype has been issued.
|
||||
"""
|
||||
query_id = (rtype, ) + tuple(sorted(filter_kwargs.items()))
|
||||
if query_id in self._satisfied_server_queries:
|
||||
# we've already asked the server this question so we don't
|
||||
# ask again because any updates will have been pushed to us
|
||||
return
|
||||
context = n_ctx.get_admin_context()
|
||||
for resource in self._puller.bulk_pull(context, rtype,
|
||||
filter_kwargs=filter_kwargs):
|
||||
self._type_cache(rtype)[resource.id] = resource
|
||||
self._satisfied_server_queries.add(query_id)
|
||||
|
||||
def get_resources(self, rtype, filters):
|
||||
"""Find resources that match key:value in filters dict."""
|
||||
self._flood_cache_for_query(rtype, **filters)
|
||||
match = lambda o: all(v == getattr(o, k) for k, v in filters.items())
|
||||
return self.match_resources_with_func(rtype, match)
|
||||
|
||||
|
|
|
@ -169,9 +169,6 @@ def create_cache_for_l2_agent():
|
|||
]
|
||||
rcache = resource_cache.RemoteResourceCache(resource_types)
|
||||
rcache.start_watcher()
|
||||
# TODO(kevinbenton): ensure flood uses filters or that this has a long
|
||||
# timeout before Pike release.
|
||||
rcache.bulk_flood_cache()
|
||||
return rcache
|
||||
|
||||
|
||||
|
|
|
@ -46,20 +46,7 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
|
|||
self.duck = OVOLikeThing(2)
|
||||
self.ctx = context.get_admin_context()
|
||||
self.rcache = resource_cache.RemoteResourceCache(rtypes)
|
||||
|
||||
def test_bulk_flood_cache(self):
|
||||
with mock.patch('neutron.api.rpc.handlers.resources_rpc.'
|
||||
'ResourcesPullRpcApi') as rpc_pull_mock:
|
||||
pull = rpc_pull_mock.return_value.bulk_pull
|
||||
pull.side_effect = [[self.duck], [self.goose]]
|
||||
self.rcache.bulk_flood_cache()
|
||||
pull.assert_any_call(mock.ANY, 'duck')
|
||||
pull.assert_any_call(mock.ANY, 'goose')
|
||||
self.assertEqual(self.goose,
|
||||
self.rcache.get_resource_by_id('goose', 1))
|
||||
self.assertEqual(self.duck,
|
||||
self.rcache.get_resource_by_id('duck', 2))
|
||||
self.assertIsNone(self.rcache.get_resource_by_id('duck', 1))
|
||||
self._pullmock = mock.patch.object(self.rcache, '_puller').start()
|
||||
|
||||
def test_get_resource_by_id(self):
|
||||
self.rcache.record_resource_update(self.ctx, 'goose', self.goose)
|
||||
|
@ -67,6 +54,17 @@ class RemoteResourceCacheTestCase(base.BaseTestCase):
|
|||
self.rcache.get_resource_by_id('goose', 1))
|
||||
self.assertIsNone(self.rcache.get_resource_by_id('goose', 2))
|
||||
|
||||
def test__flood_cache_for_query_pulls_once(self):
|
||||
self.rcache._flood_cache_for_query('goose', id=66)
|
||||
self._pullmock.bulk_pull.assert_called_once_with(
|
||||
mock.ANY, 'goose', filter_kwargs={'id': 66})
|
||||
self._pullmock.bulk_pull.reset_mock()
|
||||
self.rcache._flood_cache_for_query('goose', id=66)
|
||||
self.assertFalse(self._pullmock.called)
|
||||
self.rcache._flood_cache_for_query('goose', id=67)
|
||||
self._pullmock.bulk_pull.assert_called_once_with(
|
||||
mock.ANY, 'goose', filter_kwargs={'id': 67})
|
||||
|
||||
def test_get_resources(self):
|
||||
geese = [OVOLikeThing(3, size='large'), OVOLikeThing(5, size='medium'),
|
||||
OVOLikeThing(4, size='large'), OVOLikeThing(6, size='small')]
|
||||
|
|
Loading…
Reference in New Issue