Merge "Add required database API(s) for cache"

This commit is contained in:
Zuul 2024-02-27 14:59:26 +00:00 committed by Gerrit Code Review
commit 7b40c33cb6
4 changed files with 629 additions and 0 deletions

View File

@ -31,6 +31,7 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
DATA = {
'cached_images': {},
'images': {},
'members': [],
'metadef_namespace_resource_types': [],
@ -39,6 +40,7 @@ DATA = {
'metadef_properties': [],
'metadef_resource_types': [],
'metadef_tags': [],
'node_reference': {},
'tags': {},
'locations': [],
'tasks': {},
@ -76,6 +78,7 @@ def configure():
def reset():
global DATA
DATA = {
'cached_images': {},
'images': {},
'members': [],
'metadef_namespace_resource_types': [],
@ -84,6 +87,7 @@ def reset():
'metadef_properties': [],
'metadef_resource_types': [],
'metadef_tags': [],
'node_reference': {},
'tags': {},
'locations': [],
'tasks': {},
@ -2136,3 +2140,162 @@ def _get_metadef_id():
global INDEX
INDEX += 1
return INDEX
def _cached_image_format(cached_image):
"""Format a cached image for consumption outside of this module"""
image_dict = {
'id': cached_image['id'],
'image_id': cached_image['image_id'],
'last_accessed': cached_image['last_accessed'].timestamp(),
'last_modified': cached_image['last_modified'].timestamp(),
'size': cached_image['size'],
'hits': cached_image['hits'],
'checksum': cached_image['checksum']
}
return image_dict
@log_call
def node_reference_get_by_url(context, node_reference_url):
global DATA
db_data = DATA['node_reference']
for node_reference in db_data:
if db_data[node_reference]['node_reference_url'] == node_reference_url:
return db_data[node_reference]
else:
raise exception.NotFound()
@log_call
def node_reference_create(context, node_reference_url, **values):
global DATA
node_reference_id = values.get('node_reference_id', 1)
if node_reference_id in DATA['node_reference']:
raise exception.Duplicate()
node_reference = {
'node_reference_id': node_reference_id,
'node_reference_url': node_reference_url
}
DATA['node_reference'][node_reference_id] = node_reference
return node_reference
@log_call
def get_hit_count(context, image_id, node_reference_url):
global DATA
if image_id not in DATA['cached_images']:
return 0
cached_image = _cached_image_format(DATA['cached_images'][image_id])
return cached_image['hits']
@log_call
def get_cached_images(context, node_reference_url):
global DATA
node_reference = node_reference_get_by_url(context, node_reference_url)
all_images = DATA['cached_images']
cached_images = []
for image_id in all_images:
if all_images[image_id]['node_reference_id'] == \
node_reference['node_reference_id']:
cached_images.append(_cached_image_format(all_images[image_id]))
return cached_images
@log_call
def delete_all_cached_images(context, node_reference_url):
global DATA
node_reference = node_reference_get_by_url(context, node_reference_url)
all_images = all_images = tuple(DATA['cached_images'].keys())
for image_id in all_images:
if DATA['cached_images'][image_id]['node_reference_id'] == \
node_reference['node_reference_id']:
del DATA['cached_images'][image_id]
@log_call
def delete_cached_image(context, image_id, node_reference_url):
global DATA
node_reference = node_reference_get_by_url(context, node_reference_url)
all_images = tuple(DATA['cached_images'].keys())
for image in all_images:
if DATA['cached_images'][image]['node_reference_id'] == \
node_reference['node_reference_id'] and image_id == \
DATA['cached_images'][image]['image_id']:
del DATA['cached_images'][image]
break
@log_call
def get_least_recently_accessed(context, node_reference_url):
global DATA
all_images = get_cached_images(context, node_reference_url)
if all_images:
return all_images[0]['image_id']
return None
@log_call
def is_image_cached_for_node(context, node_reference_url, image_id):
global DATA
node_reference = node_reference_get_by_url(context, node_reference_url)
all_images = DATA['cached_images']
for image_id in all_images:
if all_images[image_id]['node_reference_id'] == \
node_reference['node_reference_id'] and image_id == \
all_images[image_id]['image_id']:
return True
return False
@log_call
def insert_cache_details(context, node_reference_url, image_id,
size, checksum=None, last_accessed=None,
last_modified=None, hits=None):
global DATA
node_reference = node_reference_get_by_url(context, node_reference_url)
accessed = last_accessed or timeutils.utcnow()
modified = last_modified or timeutils.utcnow()
values = {
'last_accessed': accessed,
'last_modified': modified,
'node_reference_id': node_reference['node_reference_id'],
'checksum': checksum,
'image_id': image_id,
'size': size,
'hits': hits or 0,
'id': str(uuid.uuid4()),
}
if image_id in DATA['cached_images']:
raise exception.Duplicate()
DATA['cached_images'][image_id] = values
@log_call
def update_hit_count(context, image_id, node_reference_url):
global DATA
last_hit_count = get_hit_count(context, image_id, node_reference_url)
node_reference = node_reference_get_by_url(context, node_reference_url)
all_images = DATA['cached_images']
last_accessed = timeutils.utcnow()
values = {
'hits': last_hit_count + 1,
'last_accessed': last_accessed
}
for image_id in all_images:
if all_images[image_id]['node_reference_id'] == \
node_reference['node_reference_id'] and image_id == \
all_images[image_id]['image_id']:
all_images[image_id].update(values)
break

View File

@ -2369,3 +2369,214 @@ def metadef_tag_count(context, namespace_name):
session = get_session()
with session.begin():
return metadef_tag_api.count(context, session, namespace_name)
def _cached_image_format(cached_image):
"""Format a cached image for consumption outside of this module"""
image_dict = {
'id': cached_image['id'],
'image_id': cached_image['image_id'],
'last_accessed': cached_image['last_accessed'].timestamp(),
'last_modified': cached_image['last_modified'].timestamp(),
'size': cached_image['size'],
'hits': cached_image['hits'],
'checksum': cached_image['checksum']
}
return image_dict
def node_reference_get_by_url(context, node_reference_url):
"""Get a node reference by node reference url"""
session = get_session()
with session.begin():
try:
query = session.query(models.NodeReference)
query = query.filter_by(node_reference_url=node_reference_url)
return query.one()
except sa_orm.exc.NoResultFound:
msg = _("The node reference %s"
" was not found." % node_reference_url)
LOG.debug(msg)
raise exception.NotFound(msg)
@utils.no_4byte_params
def node_reference_create(context, node_reference_url):
"""Create a node_reference or raise if it already exists."""
session = get_session()
values = {'node_reference_url': node_reference_url}
with session.begin():
node_reference = models.NodeReference()
node_reference.update(values.copy())
try:
node_reference.save(session=session)
except db_exception.DBDuplicateEntry:
raise exception.Duplicate()
return node_reference
def get_hit_count(context, image_id, node_reference_url):
session = get_session()
node_id = models.NodeReference.node_reference_id
filters = [
models.CachedImages.image_id == image_id,
models.NodeReference.node_reference_url == node_reference_url,
]
with session.begin():
try:
query = session.query(
models.CachedImages.hits).join(
models.NodeReference,
node_id == models.CachedImages.node_reference_id,
isouter=True).filter(sa_sql.and_(*filters))
return query.one()[0]
except sa_orm.exc.NoResultFound:
msg = _("Referenced %s is not cached on"
" %s." % (image_id, node_reference_url))
LOG.debug(msg)
# NOTE(abhishekk): Since image is not cached yet, assuming
# hit count as 0
return 0
def get_cached_images(context, node_reference_url):
node_id = models.NodeReference.node_reference_id
session = get_session()
with session.begin():
query = session.query(
models.CachedImages).join(
models.NodeReference,
node_id == models.CachedImages.node_reference_id,
isouter=True).filter(
models.NodeReference.node_reference_url == node_reference_url)
cached_images = []
for image in query.all():
cached_images.append(_cached_image_format(image))
return cached_images
@utils.no_4byte_params
def delete_all_cached_images(context, node_reference_url):
session = get_session()
with session.begin():
node_id = session.query(models.NodeReference.node_reference_id).filter(
models.NodeReference.node_reference_url == node_reference_url
).scalar_subquery()
query = session.query(models.CachedImages)
query = query.filter_by(node_reference_id=node_id)
query.delete(synchronize_session=False)
def delete_cached_image(context, image_id, node_reference_url):
session = get_session()
with session.begin():
node_id = session.query(models.NodeReference.node_reference_id).filter(
models.NodeReference.node_reference_url == node_reference_url
).scalar_subquery()
query = session.query(models.CachedImages)
query = query.filter(
models.CachedImages.image_id == image_id)
query = query.filter_by(node_reference_id=node_id)
query.delete(synchronize_session=False)
def get_least_recently_accessed(context, node_reference_url):
node_id = models.NodeReference.node_reference_id
session = get_session()
with session.begin():
query = session.query(
models.CachedImages.image_id).join(
models.NodeReference,
node_id == models.CachedImages.node_reference_id,
isouter=True).filter(
models.NodeReference.node_reference_url == node_reference_url)
query = query.order_by(models.CachedImages.last_accessed)
try:
image_id = query.first()[0]
except TypeError:
# There are no more cached images
return None
return image_id
def is_image_cached_for_node(context, node_reference_url, image_id):
node_id = models.NodeReference.node_reference_id
filters = [
models.CachedImages.image_id == image_id,
models.NodeReference.node_reference_url == node_reference_url,
]
session = get_session()
with session.begin():
try:
query = session.query(
models.CachedImages.id).join(
models.NodeReference,
node_id == models.CachedImages.node_reference_id,
isouter=True).filter(sa_sql.and_(*filters))
if query.one()[0]:
return True
except sa_orm.exc.NoResultFound:
msg = _("Referenced %s is not cached on"
" %s." % (image_id, node_reference_url))
LOG.debug(msg)
return False
@utils.no_4byte_params
def insert_cache_details(context, node_reference_url, image_id,
filesize, checksum=None, last_accessed=None,
last_modified=None, hits=None):
node_reference = node_reference_get_by_url(context, node_reference_url)
session = get_session()
accessed = last_accessed or timeutils.utcnow()
modified = last_modified or timeutils.utcnow()
values = {
'image_id': image_id,
'size': filesize,
'last_accessed': accessed,
'last_modified': modified,
'hits': hits or 0,
'checksum': checksum,
'node_reference_id': node_reference['node_reference_id']
}
with session.begin():
cached_image = models.CachedImages()
cached_image.update(values.copy())
try:
cached_image.save(session=session)
except db_exception.DBDuplicateEntry:
msg = _("Cache entry for %s for %s"
" already exists." % (image_id, node_reference_url))
LOG.debug(msg)
@utils.no_4byte_params
def update_hit_count(context, image_id, node_reference_url):
session = get_session()
last_accessed = timeutils.utcnow()
with session.begin():
node_id = session.query(models.NodeReference.node_reference_id).filter(
models.NodeReference.node_reference_url == node_reference_url
).scalar_subquery()
query = session.query(models.CachedImages)
query = query.filter(
models.CachedImages.image_id == image_id)
query = query.filter_by(node_reference_id=node_id)
query.update({
'hits': models.CachedImages.hits + 1,
'last_accessed': last_accessed
}, synchronize_session='fetch')

View File

@ -174,6 +174,128 @@ class TestMetadefSqlAlchemyDriver(base_metadef.TestMetadefDriver,
self.addCleanup(db_tests.reset)
class TestImageCacheOperations(base.TestDriver,
base.FunctionalInitWrapper):
def setUp(self):
db_tests.load(get_db, reset_db)
super(TestImageCacheOperations, self).setUp()
self.addCleanup(db_tests.reset)
# Create two images
self.images = []
for num in range(0, 2):
size = 100
image = self.db_api.image_create(
self.adm_context,
{'status': 'active',
'owner': self.adm_context.owner,
'size': size,
'name': 'test-%s-%i' % ('active', num)})
self.images.append(image)
# Create two node_references
self.node_references = [
self.db_api.node_reference_create(
self.adm_context, 'node_url_1'),
self.db_api.node_reference_create(
self.adm_context, 'node_url_2'),
]
# Cache two images on node_url_1
for node in self.node_references:
if node['node_reference_url'] == 'node_url_2':
continue
for image in self.images:
self.db_api.insert_cache_details(
self.adm_context, 'node_url_1',
image['id'], image['size'], hits=3)
def test_node_reference_get_by_url(self):
node_reference = self.db_api.node_reference_get_by_url(
self.adm_context, 'node_url_1')
self.assertEqual('node_url_1',
node_reference['node_reference_url'])
def test_node_reference_get_by_url_not_found(self):
self.assertRaises(exception.NotFound,
self.db_api.node_reference_get_by_url,
self.adm_context,
'garbage_url')
def test_get_cached_images(self):
# Two images are cached on node 'node_url_1'
cached_images = self.db_api.get_cached_images(
self.adm_context, 'node_url_1')
self.assertEqual(2, len(cached_images))
# Nothing is cached on node 'node_url_2'
cached_images = self.db_api.get_cached_images(
self.adm_context, 'node_url_2')
self.assertEqual(0, len(cached_images))
def test_get_hit_count(self):
# Hit count will be 3 for image on node_url_1
hit_count = self.db_api.get_hit_count(
self.adm_context, self.images[0]['id'], 'node_url_1')
self.assertEqual(3, hit_count)
# Hit count will be 0 for image on node_url_2
hit_count = self.db_api.get_hit_count(
self.adm_context, self.images[0]['id'], 'node_url_2')
self.assertEqual(0, hit_count)
def test_delete_all_cached_images(self):
# delete all images from node_url_1
self.db_api.delete_all_cached_images(
self.adm_context, 'node_url_1')
# Verify all images are deleted
cached_images = self.db_api.get_cached_images(
self.adm_context, 'node_url_1')
self.assertEqual(0, len(cached_images))
def test_delete_cached_image(self):
# Delete cached image from node_url_1
self.db_api.delete_cached_image(
self.adm_context, self.images[0]['id'], 'node_url_1')
# verify that image is deleted
self.assertFalse(self.db_api.is_image_cached_for_node(
self.adm_context, 'node_url_1', self.images[0]['id']))
def test_get_least_recently_accessed(self):
recently_accessed = self.db_api.get_least_recently_accessed(
self.adm_context, 'node_url_1')
# Verify we get last cached image in response
self.assertEqual(self.images[0]['id'], recently_accessed)
def test_is_image_cached_for_node(self):
# Verify image is cached for node_url_1
self.assertTrue(self.db_api.is_image_cached_for_node(
self.adm_context, 'node_url_1', self.images[0]['id']))
# Verify image is not cached for node_url_2
self.assertFalse(self.db_api.is_image_cached_for_node(
self.adm_context, 'node_url_2', self.images[0]['id']))
def test_update_hit_count(self):
# Verify image on node_url_1 has 3 as hit count
hit_count = self.db_api.get_hit_count(
self.adm_context, self.images[0]['id'], 'node_url_1')
self.assertEqual(3, hit_count)
# Update the hit count of UUID1
self.db_api.update_hit_count(
self.adm_context, self.images[0]['id'], 'node_url_1')
# Verify hit count is now 4
hit_count = self.db_api.get_hit_count(
self.adm_context, self.images[0]['id'], 'node_url_1')
self.assertEqual(4, hit_count)
class TestImageAtomicOps(base.TestDriver,
base.FunctionalInitWrapper):

View File

@ -61,6 +61,9 @@ TASK_ID_1 = 'b3006bd0-461e-4228-88ea-431c14e918b4'
TASK_ID_2 = '07b6b562-6770-4c8b-a649-37a515144ce9'
TASK_ID_3 = '72d16bb6-4d70-48a5-83fe-14bb842dc737'
NODE_REFERENCE_ID_1 = 1
NODE_REFERENCE_ID_2 = 2
def _db_fixture(id, **kwargs):
obj = {
@ -119,6 +122,26 @@ def _db_task_fixture(task_id, **kwargs):
return obj
def _db_node_reference_fixture(node_id, node_url, **kwargs):
obj = {
'node_reference_id': node_id,
'node_reference_url': node_url,
}
obj.update(kwargs)
return obj
def _db_cached_images_fixture(id, **kwargs):
obj = {
'id': id,
'image_id': kwargs.get('image_id'),
'size': kwargs.get('size'),
'hits': kwargs.get('hits')
}
obj.update(kwargs)
return obj
class TestImageRepo(test_utils.BaseTestCase):
def setUp(self):
@ -130,6 +153,29 @@ class TestImageRepo(test_utils.BaseTestCase):
self.image_factory = glance.domain.ImageFactory()
self._create_images()
self._create_image_members()
# Centralized cache
self._create_node_references()
self._create_cached_images()
def _create_node_references(self):
self.node_references = [
_db_node_reference_fixture(NODE_REFERENCE_ID_1, 'node_url_1'),
_db_node_reference_fixture(NODE_REFERENCE_ID_2, 'node_url_2'),
]
[self.db.node_reference_create(
None, node_reference['node_reference_url'],
node_reference_id=node_reference['node_reference_id']
) for node_reference in self.node_references]
def _create_cached_images(self):
self.cached_images = [
_db_cached_images_fixture(1, image_id=UUID1, size=256, hits=3),
_db_cached_images_fixture(1, image_id=UUID3, size=1024, hits=0)
]
[self.db.insert_cache_details(
None, 'node_url_1', cached_image['image_id'],
cached_image['size'], hits=cached_image['hits']
) for cached_image in self.cached_images]
def _create_images(self):
self.images = [
@ -207,6 +253,93 @@ class TestImageRepo(test_utils.BaseTestCase):
[self.db.image_member_create(None, image_member)
for image_member in self.image_members]
def test_node_reference_get_by_url(self):
node_reference = self.db.node_reference_get_by_url(self.context,
'node_url_1')
self.assertEqual(NODE_REFERENCE_ID_1,
node_reference['node_reference_id'])
def test_node_reference_get_by_url_not_found(self):
self.assertRaises(exception.NotFound,
self.db.node_reference_get_by_url,
self.context,
'garbage_url')
def test_get_cached_images(self):
# Two images are cached on node 'node_url_1'
cached_images = self.db.get_cached_images(self.context,
'node_url_1')
self.assertEqual(2, len(cached_images))
# Nothing is cached on node 'node_url_2'
cached_images = self.db.get_cached_images(self.context,
'node_url_2')
self.assertEqual(0, len(cached_images))
def test_get_hit_count(self):
# Hit count will be 3 for image UUID1
self.assertEqual(3, self.db.get_hit_count(self.context,
UUID1, 'node_url_1'))
# Hit count will be 0 for uncached image
self.assertEqual(0, self.db.get_hit_count(self.context,
UUID2, 'node_url_1'))
def test_delete_all_cached_images(self):
# Verify that we have image cached
cached_images = self.db.get_cached_images(self.context,
'node_url_1')
self.assertEqual(2, len(cached_images))
# Delete cached images from node_url_1
self.db.delete_all_cached_images(self.context, 'node_url_1')
# Verify that all cached images from node_url_1 are deleted
cached_images = self.db.get_cached_images(self.context,
'node_url_1')
self.assertEqual(0, len(cached_images))
def test_delete_cached_image(self):
# Verify that we have image cached
cached_images = self.db.get_cached_images(self.context,
'node_url_1')
self.assertEqual(2, len(cached_images))
# Delete cached image from node_url_1
self.db.delete_cached_image(self.context, UUID1, 'node_url_1')
# Verify that given image from node_url_1 is deleted
cached_images = self.db.get_cached_images(self.context,
'node_url_1')
self.assertEqual(1, len(cached_images))
def test_get_least_recently_accessed(self):
recently_accessed = self.db.get_least_recently_accessed(
self.context, 'node_url_1')
# Verify we will only get one image in response
self.assertEqual(UUID1, recently_accessed)
def test_is_image_cached_for_node(self):
# Verify UUID1 is cached for node_url_1
self.assertTrue(self.db.is_image_cached_for_node(
self.context, 'node_url_1', UUID1))
# Verify UUID3 is not cached for node_url_2
self.assertFalse(self.db.is_image_cached_for_node(
self.context, 'node_url_2', UUID3))
def test_update_hit_count(self):
# Verify UUID1 on node_url_1 has 3 as hit count
self.assertEqual(3, self.db.get_hit_count(self.context,
UUID1, 'node_url_1'))
# Update the hit count of UUID1
self.db.update_hit_count(self.context, UUID1, 'node_url_1')
# Verify hit count is now 4
self.assertEqual(4, self.db.get_hit_count(self.context,
UUID1, 'node_url_1'))
def test_get(self):
image = self.image_repo.get(UUID1)
self.assertEqual(UUID1, image.image_id)