Merge "Generic image-volume cache"

This commit is contained in:
Jenkins 2015-09-03 08:13:12 +00:00 committed by Gerrit Code Review
commit 9e5caa463f
17 changed files with 1618 additions and 34 deletions

View File

@ -1032,3 +1032,39 @@ def driver_initiator_data_update(context, initiator, namespace, updates):
def driver_initiator_data_get(context, initiator, namespace):
"""Query for an DriverPrivateData that has the specified key"""
return IMPL.driver_initiator_data_get(context, initiator, namespace)
###################
def image_volume_cache_create(context, host, image_id, image_updated_at,
volume_id, size):
"""Create a new image volume cache entry."""
return IMPL.image_volume_cache_create(context,
host,
image_id,
image_updated_at,
volume_id,
size)
def image_volume_cache_delete(context, volume_id):
"""Delete an image volume cache entry specified by volume id."""
return IMPL.image_volume_cache_delete(context, volume_id)
def image_volume_cache_get_and_update_last_used(context, image_id, host):
"""Query for an image volume cache entry."""
return IMPL.image_volume_cache_get_and_update_last_used(context,
image_id,
host)
def image_volume_cache_get_by_volume_id(context, volume_id):
"""Query to see if a volume id is an image-volume contained in the cache"""
return IMPL.image_volume_cache_get_by_volume_id(context, volume_id)
def image_volume_cache_get_all_for_host(context, host):
"""Query for all image volume cache entry for a host."""
return IMPL.image_volume_cache_get_all_for_host(context, host)

View File

@ -42,6 +42,7 @@ from sqlalchemy import or_
from sqlalchemy.orm import joinedload, joinedload_all
from sqlalchemy.orm import RelationshipProperty
from sqlalchemy.schema import Table
from sqlalchemy.sql.expression import desc
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import true
from sqlalchemy.sql import func
@ -3998,3 +3999,65 @@ PAGINATION_HELPERS = {
models.Snapshot: (_snaps_get_query, _process_snaps_filters, _snapshot_get),
models.Backup: (_backups_get_query, _process_backups_filters, _backup_get)
}
###############################
@require_context
def image_volume_cache_create(context, host, image_id, image_updated_at,
volume_id, size):
session = get_session()
with session.begin():
cache_entry = models.ImageVolumeCacheEntry()
cache_entry.host = host
cache_entry.image_id = image_id
cache_entry.image_updated_at = image_updated_at
cache_entry.volume_id = volume_id
cache_entry.size = size
session.add(cache_entry)
return cache_entry
@require_context
def image_volume_cache_delete(context, volume_id):
session = get_session()
with session.begin():
session.query(models.ImageVolumeCacheEntry).\
filter_by(volume_id=volume_id).\
delete()
@require_context
def image_volume_cache_get_and_update_last_used(context, image_id, host):
session = get_session()
with session.begin():
entry = session.query(models.ImageVolumeCacheEntry).\
filter_by(image_id=image_id).\
filter_by(host=host).\
order_by(desc(models.ImageVolumeCacheEntry.last_used)).\
first()
if entry:
entry.last_used = timeutils.utcnow()
entry.save(session=session)
return entry
@require_context
def image_volume_cache_get_by_volume_id(context, volume_id):
session = get_session()
with session.begin():
return session.query(models.ImageVolumeCacheEntry).\
filter_by(volume_id=volume_id).\
first()
@require_context
def image_volume_cache_get_all_for_host(context, host):
session = get_session()
with session.begin():
return session.query(models.ImageVolumeCacheEntry).\
filter_by(host=host).\
order_by(desc(models.ImageVolumeCacheEntry.last_used)).\
all()

View File

@ -0,0 +1,46 @@
# Copyright (C) 2015 Pure Storage, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column, DateTime, Integer
from sqlalchemy import MetaData, String, Table
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# New table
image_volume_cache = Table(
'image_volume_cache_entries', meta,
Column('image_updated_at', DateTime(timezone=False)),
Column('id', Integer, primary_key=True, nullable=False),
Column('host', String(length=255), index=True, nullable=False),
Column('image_id', String(length=36), index=True, nullable=False),
Column('volume_id', String(length=36), nullable=False),
Column('size', Integer, nullable=False),
Column('last_used', DateTime, nullable=False),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
image_volume_cache.create()
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
table_name = 'image_volume_cache_entries'
image_volume_cache = Table(table_name, meta, autoload=True)
image_volume_cache.drop()

View File

@ -601,6 +601,18 @@ class DriverInitiatorData(BASE, models.TimestampMixin, models.ModelBase):
value = Column(String(255))
class ImageVolumeCacheEntry(BASE, models.ModelBase):
"""Represents an image volume cache entry"""
__tablename__ = 'image_volume_cache_entries'
id = Column(Integer, primary_key=True, nullable=False)
host = Column(String(255), index=True, nullable=False)
image_id = Column(String(36), index=True, nullable=False)
image_updated_at = Column(DateTime, nullable=False)
volume_id = Column(String(36), nullable=False)
size = Column(Integer, nullable=False)
last_used = Column(DateTime, default=lambda: timeutils.utcnow())
def register_models():
"""Register Models and create metadata.

225
cinder/image/cache.py Normal file
View File

@ -0,0 +1,225 @@
# Copyright (C) 2015 Pure Storage, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from pytz import timezone
import six
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from cinder.i18n import _LW
from cinder import rpc
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class ImageVolumeCache(object):
def __init__(self, db, volume_api, max_cache_size_gb=0,
max_cache_size_count=0):
self.db = db
self.volume_api = volume_api
self.max_cache_size_gb = int(max_cache_size_gb)
self.max_cache_size_count = int(max_cache_size_count)
self.notifier = rpc.get_notifier('volume', CONF.host)
def get_by_image_volume(self, context, volume_id):
return self.db.image_volume_cache_get_by_volume_id(context, volume_id)
def evict(self, context, cache_entry):
LOG.debug('Evicting image cache entry: %(entry)s.',
{'entry': self._entry_to_str(cache_entry)})
self.db.image_volume_cache_delete(context, cache_entry['volume_id'])
self._notify_cache_eviction(context, cache_entry['image_id'],
cache_entry['host'])
def get_entry(self, context, volume_ref, image_id, image_meta):
cache_entry = self.db.image_volume_cache_get_and_update_last_used(
context,
image_id,
volume_ref['host']
)
if cache_entry:
LOG.debug('Found image-volume cache entry: %(entry)s.',
{'entry': self._entry_to_str(cache_entry)})
if self._should_update_entry(cache_entry, image_meta):
LOG.debug('Image-volume cache entry is out-dated, evicting: '
'%(entry)s.',
{'entry': self._entry_to_str(cache_entry)})
self._delete_image_volume(context, cache_entry)
cache_entry = None
if cache_entry:
self._notify_cache_hit(context, cache_entry['image_id'],
cache_entry['host'])
else:
self._notify_cache_miss(context, image_id,
volume_ref['host'])
return cache_entry
def create_cache_entry(self, context, volume_ref, image_id, image_meta):
"""Create a new cache entry for an image.
This assumes that the volume described by volume_ref has already been
created and is in an available state.
"""
LOG.debug('Creating new image-volume cache entry for image '
'%(image_id)s on host %(host)s.',
{'image_id': image_id, 'host': volume_ref['host']})
# When we are creating an image from a volume the updated_at field
# will be a unicode representation of the datetime. In that case
# we just need to parse it into one. If it is an actual datetime
# we want to just grab it as a UTC naive datetime.
image_updated_at = image_meta['updated_at']
if type(image_updated_at) in [unicode, str]:
image_updated_at = timeutils.parse_strtime(image_updated_at)
else:
image_updated_at = image_updated_at.astimezone(timezone('UTC'))
cache_entry = self.db.image_volume_cache_create(
context,
volume_ref['host'],
image_id,
image_updated_at.replace(tzinfo=None),
volume_ref['id'],
volume_ref['size']
)
LOG.debug('New image-volume cache entry created: %(entry)s.',
{'entry': self._entry_to_str(cache_entry)})
return cache_entry
def ensure_space(self, context, space_required, host):
"""Makes room for a cache entry.
Returns True if successful, false otherwise.
"""
# Check to see if the cache is actually limited.
if self.max_cache_size_gb == 0 and self.max_cache_size_count == 0:
return True
# Make sure that we can potentially fit the image in the cache
# and bail out before evicting everything else to try and make
# room for it.
if (self.max_cache_size_gb != 0 and
space_required > self.max_cache_size_gb):
return False
# Assume the entries are ordered by most recently used to least used.
entries = self.db.image_volume_cache_get_all_for_host(context, host)
current_count = len(entries)
current_size = 0
for entry in entries:
current_size += entry['size']
# Add values for the entry we intend to create.
current_size += space_required
current_count += 1
LOG.debug('Image-volume cache for host %(host)s current_size (GB) = '
'%(size_gb)s (max = %(max_gb)s), current count = %(count)s '
'(max = %(max_count)s).',
{'host': host,
'size_gb': current_size,
'max_gb': self.max_cache_size_gb,
'count': current_count,
'max_count': self.max_cache_size_count})
while ((current_size > self.max_cache_size_gb
or current_count > self.max_cache_size_count)
and len(entries)):
entry = entries.pop()
LOG.debug('Reclaiming image-volume cache space; removing cache '
'entry %(entry)s.', {'entry': self._entry_to_str(entry)})
self._delete_image_volume(context, entry)
current_size -= entry['size']
current_count -= 1
LOG.debug('Image-volume cache for host %(host)s new size (GB) = '
'%(size_gb)s, new count = %(count)s.',
{'host': host,
'size_gb': current_size,
'count': current_count})
# It is only possible to not free up enough gb, we will always be able
# to free enough count. This is because 0 means unlimited which means
# it is guaranteed to be >0 if limited, and we can always delete down
# to 0.
if self.max_cache_size_gb > 0:
if current_size > self.max_cache_size_gb > 0:
LOG.warning(_LW('Image-volume cache for host %(host)s does '
'not have enough space (GB).'), {'host': host})
return False
return True
def _notify_cache_hit(self, context, image_id, host):
self._notify_cache_action(context, image_id, host, 'hit')
def _notify_cache_miss(self, context, image_id, host):
self._notify_cache_action(context, image_id, host, 'miss')
def _notify_cache_eviction(self, context, image_id, host):
self._notify_cache_action(context, image_id, host, 'evict')
def _notify_cache_action(self, context, image_id, host, action):
data = {
'image_id': image_id,
'host': host,
}
LOG.debug('ImageVolumeCache notification: action=%(action)s'
' data=%(data)s.', {'action': action, 'data': data})
self.notifier.info(context, 'image_volume_cache.%s' % action, data)
def _delete_image_volume(self, context, cache_entry):
"""Delete a volume and remove cache entry."""
volume_ref = self.db.volume_get(context, cache_entry['volume_id'])
# Delete will evict the cache entry.
self.volume_api.delete(context, volume_ref)
def _get_image_volume_name(self, image_id):
return 'image-volume-' + image_id
def _should_update_entry(self, cache_entry, image_meta):
"""Ensure that the cache entry image data is still valid."""
image_updated_utc = (image_meta['updated_at']
.astimezone(timezone('UTC')))
cache_updated_utc = (cache_entry['image_updated_at']
.replace(tzinfo=timezone('UTC')))
LOG.debug('Image-volume cache entry image_update_at = %(entry_utc)s, '
'requested image updated_at = %(image_utc)s.',
{'entry_utc': six.text_type(cache_updated_utc),
'image_utc': six.text_type(image_updated_utc)})
return image_updated_utc != cache_updated_utc
def _entry_to_str(self, cache_entry):
return six.text_type({
'id': cache_entry['id'],
'image_id': cache_entry['image_id'],
'volume_id': cache_entry['volume_id'],
'host': cache_entry['host'],
'size': cache_entry['size'],
'image_updated_at': cache_entry['image_updated_at'],
'last_used': cache_entry['last_used'],
})

View File

@ -18,6 +18,7 @@
import copy
import datetime
import mock
import uuid
from cinder import exception
@ -138,6 +139,7 @@ class _FakeImageService(object):
self.create(None, image6)
self.create(None, image7)
self._imagedata = {}
self.temp_images = mock.MagicMock()
super(_FakeImageService, self).__init__()
# TODO(bcwaldon): implement optional kwargs such as limit, sort_dir

View File

@ -0,0 +1,295 @@
# Copyright (C) 2015 Pure Storage, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import timedelta
import mock
from oslo_utils import timeutils
from cinder import context as ctxt
from cinder import test
patcher = mock.patch('cinder.rpc')
patcher.start()
from cinder.image import cache as image_cache
patcher.stop()
class ImageVolumeCacheTestCase(test.TestCase):
def setUp(self):
super(ImageVolumeCacheTestCase, self).setUp()
self.mock_db = mock.Mock()
self.mock_volume_api = mock.Mock()
self.context = ctxt.get_admin_context()
self.notifier = mock.Mock()
def _build_cache(self, max_gb=0, max_count=0):
cache = image_cache.ImageVolumeCache(self.mock_db,
self.mock_volume_api,
max_gb,
max_count)
cache.notifier = self.notifier
return cache
def _build_entry(self, size=10):
entry = {
'id': 1,
'host': 'test@foo#bar',
'image_id': 'c7a8b8d4-e519-46c7-a0df-ddf1b9b9fff2',
'image_updated_at': timeutils.utcnow(with_timezone=True),
'volume_id': '70a599e0-31e7-49b7-b260-868f441e862b',
'size': size,
'last_used': timeutils.utcnow(with_timezone=True)
}
return entry
def test_get_by_image_volume(self):
cache = self._build_cache()
ret = {'id': 1}
volume_id = '70a599e0-31e7-49b7-b260-868f441e862b'
self.mock_db.image_volume_cache_get_by_volume_id.return_value = ret
entry = cache.get_by_image_volume(self.context, volume_id)
self.assertEqual(ret, entry)
self.mock_db.image_volume_cache_get_by_volume_id.return_value = None
entry = cache.get_by_image_volume(self.context, volume_id)
self.assertIsNone(entry)
def test_evict(self):
cache = self._build_cache()
entry = self._build_entry()
cache.evict(self.context, entry)
self.mock_db.image_volume_cache_delete.assert_called_once_with(
self.context,
entry['volume_id']
)
self.notifier.info.assert_called_once_with(
self.context,
'image_volume_cache.evict',
{'image_id': entry['image_id'], 'host': entry['host']}
)
def test_get_entry(self):
cache = self._build_cache()
entry = self._build_entry()
volume_ref = {
'host': 'foo@bar#whatever'
}
image_meta = {
'is_public': True,
'owner': '70a599e0-31e7-49b7-b260-868f441e862b',
'properties': {
'virtual_size': '1.7'
},
'updated_at': entry['image_updated_at']
}
(self.mock_db.
image_volume_cache_get_and_update_last_used.return_value) = entry
found_entry = cache.get_entry(self.context,
volume_ref,
entry['image_id'],
image_meta)
self.assertDictMatch(entry, found_entry)
(self.mock_db.
image_volume_cache_get_and_update_last_used.assert_called_once_with)(
self.context,
entry['image_id'],
volume_ref['host']
)
self.notifier.info.assert_called_once_with(
self.context,
'image_volume_cache.hit',
{'image_id': entry['image_id'], 'host': entry['host']}
)
def test_get_entry_not_exists(self):
cache = self._build_cache()
volume_ref = {
'host': 'foo@bar#whatever'
}
image_meta = {
'is_public': True,
'owner': '70a599e0-31e7-49b7-b260-868f441e862b',
'properties': {
'virtual_size': '1.7'
},
'updated_at': timeutils.utcnow(with_timezone=True)
}
image_id = 'c7a8b8d4-e519-46c7-a0df-ddf1b9b9fff2'
(self.mock_db.
image_volume_cache_get_and_update_last_used.return_value) = None
found_entry = cache.get_entry(self.context,
volume_ref,
image_id,
image_meta)
self.assertIsNone(found_entry)
self.notifier.info.assert_called_once_with(
self.context,
'image_volume_cache.miss',
{'image_id': image_id, 'host': volume_ref['host']}
)
def test_get_entry_needs_update(self):
cache = self._build_cache()
entry = self._build_entry()
volume_ref = {
'host': 'foo@bar#whatever'
}
image_meta = {
'is_public': True,
'owner': '70a599e0-31e7-49b7-b260-868f441e862b',
'properties': {
'virtual_size': '1.7'
},
'updated_at': entry['image_updated_at'] + timedelta(hours=2)
}
(self.mock_db.
image_volume_cache_get_and_update_last_used.return_value) = entry
mock_volume = mock.Mock()
self.mock_db.volume_get.return_value = mock_volume
found_entry = cache.get_entry(self.context,
volume_ref,
entry['image_id'],
image_meta)
# Expect that the cache entry is not returned and the image-volume
# for it is deleted.
self.assertIsNone(found_entry)
self.mock_volume_api.delete.assert_called_with(self.context,
mock_volume)
self.notifier.info.assert_called_once_with(
self.context,
'image_volume_cache.miss',
{'image_id': entry['image_id'], 'host': volume_ref['host']}
)
def test_create_cache_entry(self):
cache = self._build_cache()
entry = self._build_entry()
volume_ref = {
'id': entry['volume_id'],
'host': entry['host'],
'size': entry['size']
}
image_meta = {
'updated_at': entry['image_updated_at']
}
self.mock_db.image_volume_cache_create.return_value = entry
created_entry = cache.create_cache_entry(self.context,
volume_ref,
entry['image_id'],
image_meta)
self.assertEqual(entry, created_entry)
self.mock_db.image_volume_cache_create.assert_called_once_with(
self.context,
entry['host'],
entry['image_id'],
entry['image_updated_at'].replace(tzinfo=None),
entry['volume_id'],
entry['size']
)
def test_ensure_space_unlimited(self):
cache = self._build_cache(max_gb=0, max_count=0)
host = 'foo@bar#whatever'
has_space = cache.ensure_space(self.context, 0, host)
self.assertTrue(has_space)
has_space = cache.ensure_space(self.context, 500, host)
self.assertTrue(has_space)
def test_ensure_space_no_entries(self):
cache = self._build_cache(max_gb=100, max_count=10)
host = 'foo@bar#whatever'
self.mock_db.image_volume_cache_get_all_for_host.return_value = []
has_space = cache.ensure_space(self.context, 5, host)
self.assertTrue(has_space)
has_space = cache.ensure_space(self.context, 101, host)
self.assertFalse(has_space)
def test_ensure_space_need_gb(self):
cache = self._build_cache(max_gb=30, max_count=10)
mock_delete = mock.patch.object(cache, '_delete_image_volume').start()
host = 'foo@bar#whatever'
entries = []
entry1 = self._build_entry(size=12)
entries.append(entry1)
entry2 = self._build_entry(size=5)
entries.append(entry2)
entry3 = self._build_entry(size=10)
entries.append(entry3)
self.mock_db.image_volume_cache_get_all_for_host.return_value = entries
has_space = cache.ensure_space(self.context, 15, host)
self.assertTrue(has_space)
self.assertEqual(2, mock_delete.call_count)
mock_delete.assert_any_call(self.context, entry2)
mock_delete.assert_any_call(self.context, entry3)
def test_ensure_space_need_count(self):
cache = self._build_cache(max_gb=30, max_count=2)
mock_delete = mock.patch.object(cache, '_delete_image_volume').start()
host = 'foo@bar#whatever'
entries = []
entry1 = self._build_entry(size=10)
entries.append(entry1)
entry2 = self._build_entry(size=5)
entries.append(entry2)
self.mock_db.image_volume_cache_get_all_for_host.return_value = entries
has_space = cache.ensure_space(self.context, 12, host)
self.assertTrue(has_space)
self.assertEqual(1, mock_delete.call_count)
mock_delete.assert_any_call(self.context, entry2)
def test_ensure_space_need_gb_and_count(self):
cache = self._build_cache(max_gb=30, max_count=3)
mock_delete = mock.patch.object(cache, '_delete_image_volume').start()
host = 'foo@bar#whatever'
entries = []
entry1 = self._build_entry(size=10)
entries.append(entry1)
entry2 = self._build_entry(size=5)
entries.append(entry2)
entry3 = self._build_entry(size=12)
entries.append(entry3)
self.mock_db.image_volume_cache_get_all_for_host.return_value = entries
has_space = cache.ensure_space(self.context, 16, host)
self.assertTrue(has_space)
self.assertEqual(2, mock_delete.call_count)
mock_delete.assert_any_call(self.context, entry2)
mock_delete.assert_any_call(self.context, entry3)
def test_ensure_space_cant_free_enough_gb(self):
cache = self._build_cache(max_gb=30, max_count=10)
mock_delete = mock.patch.object(cache, '_delete_image_volume').start()
host = 'foo@bar#whatever'
entries = list(self._build_entry(size=25))
self.mock_db.image_volume_cache_get_all_for_host.return_value = entries
has_space = cache.ensure_space(self.context, 50, host)
self.assertFalse(has_space)
mock_delete.assert_not_called()

View File

@ -2067,3 +2067,140 @@ class DBAPIDriverInitiatorDataTestCase(BaseTest):
update = {'remove_values': ['key_that_doesnt_exist']}
db.driver_initiator_data_update(self.ctxt, self.initiator,
self.namespace, update)
class DBAPIImageVolumeCacheEntryTestCase(BaseTest):
def _validate_entry(self, entry, host, image_id, image_updated_at,
volume_id, size):
self.assertIsNotNone(entry)
self.assertIsNotNone(entry['id'])
self.assertEqual(host, entry['host'])
self.assertEqual(image_id, entry['image_id'])
self.assertEqual(image_updated_at, entry['image_updated_at'])
self.assertEqual(volume_id, entry['volume_id'])
self.assertEqual(size, entry['size'])
self.assertIsNotNone(entry['last_used'])
def test_create_delete_query_cache_entry(self):
host = 'abc@123#poolz'
image_id = 'c06764d7-54b0-4471-acce-62e79452a38b'
image_updated_at = datetime.datetime.utcnow()
volume_id = 'e0e4f819-24bb-49e6-af1e-67fb77fc07d1'
size = 6
entry = db.image_volume_cache_create(self.ctxt, host, image_id,
image_updated_at, volume_id, size)
self._validate_entry(entry, host, image_id, image_updated_at,
volume_id, size)
entry = db.image_volume_cache_get_and_update_last_used(self.ctxt,
image_id,
host)
self._validate_entry(entry, host, image_id, image_updated_at,
volume_id, size)
entry = db.image_volume_cache_get_by_volume_id(self.ctxt, volume_id)
self._validate_entry(entry, host, image_id, image_updated_at,
volume_id, size)
db.image_volume_cache_delete(self.ctxt, entry['volume_id'])
entry = db.image_volume_cache_get_and_update_last_used(self.ctxt,
image_id,
host)
self.assertIsNone(entry)
def test_cache_entry_get_multiple(self):
host = 'abc@123#poolz'
image_id = 'c06764d7-54b0-4471-acce-62e79452a38b'
image_updated_at = datetime.datetime.utcnow()
volume_id = 'e0e4f819-24bb-49e6-af1e-67fb77fc07d1'
size = 6
entries = []
for i in range(0, 3):
entries.append(db.image_volume_cache_create(self.ctxt,
host,
image_id,
image_updated_at,
volume_id,
size))
# It is considered OK for the cache to have multiple of the same
# entries. Expect only a single one from the query.
entry = db.image_volume_cache_get_and_update_last_used(self.ctxt,
image_id,
host)
self._validate_entry(entry, host, image_id, image_updated_at,
volume_id, size)
# We expect to get the same one on subsequent queries due to the
# last_used field being updated each time and ordering by it.
entry_id = entry['id']
entry = db.image_volume_cache_get_and_update_last_used(self.ctxt,
image_id,
host)
self._validate_entry(entry, host, image_id, image_updated_at,
volume_id, size)
self.assertEqual(entry_id, entry['id'])
# Cleanup
for entry in entries:
db.image_volume_cache_delete(self.ctxt, entry['volume_id'])
def test_cache_entry_get_none(self):
host = 'abc@123#poolz'
image_id = 'c06764d7-54b0-4471-acce-62e79452a38b'
entry = db.image_volume_cache_get_and_update_last_used(self.ctxt,
image_id,
host)
self.assertIsNone(entry)
def test_cache_entry_get_by_volume_id_none(self):
volume_id = 'e0e4f819-24bb-49e6-af1e-67fb77fc07d1'
entry = db.image_volume_cache_get_by_volume_id(self.ctxt, volume_id)
self.assertIsNone(entry)
def test_cache_entry_get_all_for_host(self):
host = 'abc@123#poolz'
image_updated_at = datetime.datetime.utcnow()
size = 6
entries = []
for i in range(0, 3):
entries.append(db.image_volume_cache_create(self.ctxt,
host,
'image-' + str(i),
image_updated_at,
'vol-' + str(i),
size))
other_entry = db.image_volume_cache_create(self.ctxt,
'someOtherHost',
'image-12345',
image_updated_at,
'vol-1234',
size)
found_entries = db.image_volume_cache_get_all_for_host(self.ctxt, host)
self.assertIsNotNone(found_entries)
self.assertEqual(len(entries), len(found_entries))
for found_entry in found_entries:
for entry in entries:
if found_entry['id'] == entry['id']:
self._validate_entry(found_entry,
entry['host'],
entry['image_id'],
entry['image_updated_at'],
entry['volume_id'],
entry['size'])
# Cleanup
db.image_volume_cache_delete(self.ctxt, other_entry['volume_id'])
for entry in entries:
db.image_volume_cache_delete(self.ctxt, entry['volume_id'])
def test_cache_entry_get_all_for_host_none(self):
host = 'abc@123#poolz'
entries = db.image_volume_cache_get_all_for_host(self.ctxt, host)
self.assertEqual([], entries)

View File

@ -892,6 +892,38 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
backups = db_utils.get_table(engine, 'backups')
self.assertNotIn('num_dependent_backups', backups.c)
def _check_055(self, engine, data):
"""Test adding image_volume_cache_entries table."""
has_table = engine.dialect.has_table(engine.connect(),
"image_volume_cache_entries")
self.assertTrue(has_table)
private_data = db_utils.get_table(
engine,
'image_volume_cache_entries'
)
self.assertIsInstance(private_data.c.id.type,
sqlalchemy.types.INTEGER)
self.assertIsInstance(private_data.c.host.type,
sqlalchemy.types.VARCHAR)
self.assertIsInstance(private_data.c.image_id.type,
sqlalchemy.types.VARCHAR)
self.assertIsInstance(private_data.c.image_updated_at.type,
self.TIME_TYPE)
self.assertIsInstance(private_data.c.volume_id.type,
sqlalchemy.types.VARCHAR)
self.assertIsInstance(private_data.c.size.type,
sqlalchemy.types.INTEGER)
self.assertIsInstance(private_data.c.last_used.type,
self.TIME_TYPE)
def _post_downgrade_055(self, engine):
"""Test removing image_volume_cache_entries table."""
has_table = engine.dialect.has_table(engine.connect(),
"image_volume_cache_entries")
self.assertFalse(has_table)
def test_walk_versions(self):
self.walk_versions(True, False)

View File

@ -31,6 +31,7 @@ from cinder.image import image_utils
from cinder import test
from cinder.tests.unit.image import fake as fake_image
from cinder.tests.unit import test_volume
from cinder.tests.unit import utils
from cinder.volume import configuration as conf
import cinder.volume.drivers.rbd as driver
from cinder.volume.flows.manager import create_volume
@ -1145,13 +1146,15 @@ class ManagedRBDTestCase(test_volume.DriverTestCase):
self.assertTrue(mock_clone_image.called)
self.assertFalse(mock_create.called)
def test_create_vol_from_non_raw_image_status_available(self):
@mock.patch('cinder.image.image_utils.TemporaryImages.fetch')
def test_create_vol_from_non_raw_image_status_available(self, mock_fetch):
"""Clone non-raw image then verify volume is in available state."""
def _mock_clone_image(context, volume, image_location,
image_meta, image_service):
return {'provider_location': None}, False
mock_fetch.return_value = mock.MagicMock(spec=utils.get_file_spec())
with mock.patch.object(self.volume.driver, 'clone_image') as \
mock_clone_image:
mock_clone_image.side_effect = _mock_clone_image

View File

@ -3489,9 +3489,11 @@ class VolumeTestCase(BaseVolumeTestCase):
self.context,
volume_id)
@mock.patch('cinder.image.image_utils.TemporaryImages.fetch')
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask._clone_image_volume')
def _create_volume_from_image(self, mock_clone_image_volume,
mock_fetch_img,
fakeout_copy_image_to_volume=False,
fakeout_clone_image=False,
clone_image_volume=False):
@ -3527,6 +3529,8 @@ class VolumeTestCase(BaseVolumeTestCase):
self.stubs.Set(self.volume, '_copy_image_to_volume',
fake_copy_image_to_volume)
mock_clone_image_volume.return_value = ({}, clone_image_volume)
mock_fetch_img.return_value = mock.MagicMock(
spec=tests_utils.get_file_spec())
image_id = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'
volume_id = tests_utils.create_volume(self.context,
@ -7353,3 +7357,61 @@ class VolumePolicyTestCase(test.TestCase):
cinder.policy.enforce(self.context, 'volume:attach', target)
self.mox.ReplayAll()
cinder.volume.api.check_policy(self.context, 'attach', {'id': 2})
class ImageVolumeCacheTestCase(BaseVolumeTestCase):
def setUp(self):
super(ImageVolumeCacheTestCase, self).setUp()
self.volume.driver.set_initialized()
@mock.patch('oslo_utils.importutils.import_object')
def test_cache_configs(self, mock_import_object):
opts = {
'image_volume_cache_enabled': True,
'image_volume_cache_max_size_gb': 100,
'image_volume_cache_max_count': 20
}
def conf_get(option):
if option in opts:
return opts[option]
else:
return None
mock_driver = mock.Mock()
mock_driver.configuration.safe_get.side_effect = conf_get
mock_driver.configuration.extra_capabilities = 'null'
def import_obj(*args, **kwargs):
return mock_driver
mock_import_object.side_effect = import_obj
manager = vol_manager.VolumeManager(volume_driver=mock_driver)
self.assertIsNotNone(manager)
self.assertIsNotNone(manager.image_volume_cache)
self.assertEqual(100, manager.image_volume_cache.max_cache_size_gb)
self.assertEqual(20, manager.image_volume_cache.max_cache_size_count)
def test_delete_image_volume(self):
volume_params = {
'status': 'creating',
'host': 'some_host',
'size': 1
}
volume_api = cinder.volume.api.API()
volume = tests_utils.create_volume(self.context, **volume_params)
volume = db.volume_update(self.context, volume['id'],
{'status': 'available'})
image_id = '70a599e0-31e7-49b7-b260-868f441e862b'
db.image_volume_cache_create(self.context,
volume['host'],
image_id,
datetime.datetime.utcnow(),
volume['id'],
volume['size'])
volume_api.delete(self.context, volume)
entry = db.image_volume_cache_get_by_volume_id(self.context,
volume['id'])
self.assertIsNone(entry)

View File

@ -14,6 +14,7 @@
#
import socket
import sys
import uuid
from oslo_service import loopingcall
@ -209,3 +210,26 @@ def replace_obj_loader(testcase, obj):
testcase.addCleanup(setattr, obj, 'obj_load_attr', obj.obj_load_attr)
obj.obj_load_attr = fake_obj_load_attr
file_spec = None
def get_file_spec():
"""Return a Python 2 and 3 compatible version of a 'file' spec.
This is to be used anywhere that you need to do something such as
mock.MagicMock(spec=file) to mock out something with the file attributes.
Due to the 'file' built-in method being removed in Python 3 we need to do
some special handling for it.
"""
global file_spec
# set on first use
if file_spec is None:
if sys.version_info[0] == 3:
import _io
file_spec = list(set(dir(_io.TextIOWrapper)).union(
set(dir(_io.BytesIO))))
else:
file_spec = file

View File

@ -18,12 +18,14 @@ import mock
from cinder import context
from cinder import exception
from cinder.openstack.common import imageutils
from cinder import test
from cinder.tests.unit import fake_consistencygroup
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.image import fake as fake_image
from cinder.tests.unit.keymgr import mock_key_mgr
from cinder.tests.unit import utils
from cinder.tests.unit.volume.flows import fake_volume_api
from cinder.volume.flows.api import create_volume
from cinder.volume.flows.manager import create_volume as create_volume_manager
@ -190,8 +192,9 @@ class CreateVolumeFlowManagerTestCase(test.TestCase):
def test_create_from_snapshot(self, snapshot_get_by_id, handle_bootable):
fake_db = mock.MagicMock()
fake_driver = mock.MagicMock()
fake_volume_manager = mock.MagicMock()
fake_manager = create_volume_manager.CreateVolumeFromSpecTask(
fake_db, fake_driver)
fake_volume_manager, fake_db, fake_driver)
volume = fake_volume.fake_db_volume()
orig_volume_db = mock.MagicMock(id=10, bootable=True)
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctxt)
@ -211,8 +214,9 @@ class CreateVolumeFlowManagerTestCase(test.TestCase):
def test_create_from_snapshot_update_failure(self, snapshot_get_by_id):
fake_db = mock.MagicMock()
fake_driver = mock.MagicMock()
fake_volume_manager = mock.MagicMock()
fake_manager = create_volume_manager.CreateVolumeFromSpecTask(
fake_db, fake_driver)
fake_volume_manager, fake_db, fake_driver)
volume = fake_volume.fake_db_volume()
snapshot_obj = fake_snapshot.fake_snapshot_obj(self.ctxt)
snapshot_get_by_id.return_value = snapshot_obj
@ -231,16 +235,20 @@ class CreateVolumeFlowManagerGlanceCinderBackendCase(test.TestCase):
super(CreateVolumeFlowManagerGlanceCinderBackendCase, self).setUp()
self.ctxt = context.get_admin_context()
@mock.patch('cinder.image.image_utils.TemporaryImages.fetch')
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask.'
'_handle_bootable_volume_glance_meta')
def test_create_from_image_volume(self, handle_bootable, format='raw',
owner=None, location=True):
def test_create_from_image_volume(self, handle_bootable, mock_fetch_img,
format='raw', owner=None,
location=True):
self.flags(allowed_direct_url_schemes=['cinder'])
mock_fetch_img.return_value = mock.MagicMock(
spec=utils.get_file_spec())
fake_db = mock.MagicMock()
fake_driver = mock.MagicMock()
fake_manager = create_volume_manager.CreateVolumeFromSpecTask(
fake_db, fake_driver)
mock.MagicMock(), fake_db, fake_driver)
fake_image_service = mock.MagicMock()
volume = fake_volume.fake_volume_obj(self.ctxt)
image_volume = fake_volume.fake_volume_obj(self.ctxt,
@ -280,3 +288,429 @@ class CreateVolumeFlowManagerGlanceCinderBackendCase(test.TestCase):
def test_create_from_image_volume_without_location(self):
self.test_create_from_image_volume(location=False)
@mock.patch('cinder.image.image_utils.TemporaryImages.fetch')
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask.'
'_handle_bootable_volume_glance_meta')
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask.'
'_create_from_source_volume')
@mock.patch('cinder.volume.flows.manager.create_volume.'
'CreateVolumeFromSpecTask.'
'_create_from_image_download')
@mock.patch('cinder.context.get_internal_tenant_context')
class CreateVolumeFlowManagerImageCacheTestCase(test.TestCase):
def setUp(self):
super(CreateVolumeFlowManagerImageCacheTestCase, self).setUp()
self.ctxt = context.get_admin_context()
self.mock_db = mock.MagicMock()
self.mock_driver = mock.MagicMock()
self.mock_cache = mock.MagicMock()
self.mock_image_service = mock.MagicMock()
self.mock_volume_manager = mock.MagicMock()
self.internal_context = self.ctxt
self.internal_context.user_id = 'abc123'
self.internal_context.project_id = 'def456'
def test_create_from_image_clone_image_and_skip_cache(
self, mock_get_internal_context, mock_create_from_img_dl,
mock_create_from_src, mock_handle_bootable, mock_fetch_img):
self.mock_driver.clone_image.return_value = (None, True)
volume = fake_volume.fake_volume_obj(self.ctxt)
image_location = 'someImageLocationStr'
image_id = 'c7a8b8d4-e519-46c7-a0df-ddf1b9b9fff2'
image_meta = mock.Mock()
manager = create_volume_manager.CreateVolumeFromSpecTask(
self.mock_volume_manager,
self.mock_db,
self.mock_driver,
image_volume_cache=self.mock_cache
)
manager._create_from_image(self.ctxt,
volume,
image_location,
image_id,
image_meta,
self.mock_image_service)
# Make sure clone_image is always called even if the cache is enabled
self.assertTrue(self.mock_driver.clone_image.called)
# Create from source shouldn't happen if clone_image succeeds
self.assertFalse(mock_create_from_src.called)
# The image download should not happen if clone_image succeeds
self.assertFalse(mock_create_from_img_dl.called)
mock_handle_bootable.assert_called_once_with(
self.ctxt,
volume['id'],
image_id=image_id,
image_meta=image_meta
)
def test_create_from_image_cannot_use_cache(
self, mock_get_internal_context, mock_create_from_img_dl,
mock_create_from_src, mock_handle_bootable, mock_fetch_img):
mock_get_internal_context.return_value = None
self.mock_driver.clone_image.return_value = (None, False)
volume = fake_volume.fake_volume_obj(self.ctxt)
image_location = 'someImageLocationStr'
image_id = 'c7a8b8d4-e519-46c7-a0df-ddf1b9b9fff2'
image_meta = {
'properties': {
'virtual_size': '2147483648'
}
}
manager = create_volume_manager.CreateVolumeFromSpecTask(
self.mock_volume_manager,
self.mock_db,
self.mock_driver,
image_volume_cache=self.mock_cache
)
manager._create_from_image(self.ctxt,
volume,
image_location,
image_id,
image_meta,
self.mock_image_service)
# Make sure clone_image is always called
self.assertTrue(self.mock_driver.clone_image.called)
# Create from source shouldn't happen if cache cannot be used.
self.assertFalse(mock_create_from_src.called)
# The image download should happen if clone fails and we can't use the
# image-volume cache.
mock_create_from_img_dl.assert_called_once_with(
self.ctxt,
volume,
image_location,
image_id,
self.mock_image_service
)
# This should not attempt to use a minimal size volume
self.assertFalse(self.mock_db.volume_update.called)
# Make sure we didn't try and create a cache entry
self.assertFalse(self.mock_cache.ensure_space.called)
self.assertFalse(self.mock_cache.create_cache_entry.called)
mock_handle_bootable.assert_called_once_with(
self.ctxt,
volume['id'],
image_id=image_id,
image_meta=image_meta
)
def test_create_from_image_cache_hit(
self, mock_get_internal_context, mock_create_from_img_dl,
mock_create_from_src, mock_handle_bootable, mock_fetch_img):
self.mock_driver.clone_image.return_value = (None, False)
image_volume_id = '70a599e0-31e7-49b7-b260-868f441e862b'
self.mock_cache.get_entry.return_value = {
'volume_id': image_volume_id
}
volume = fake_volume.fake_volume_obj(self.ctxt)
image_location = 'someImageLocationStr'
image_id = 'c7a8b8d4-e519-46c7-a0df-ddf1b9b9fff2'
image_meta = mock.Mock()
manager = create_volume_manager.CreateVolumeFromSpecTask(
self.mock_volume_manager,
self.mock_db,
self.mock_driver,
image_volume_cache=self.mock_cache
)
manager._create_from_image(self.ctxt,
volume,
image_location,
image_id,
image_meta,
self.mock_image_service)
# Make sure clone_image is always called even if the cache is enabled
self.assertTrue(self.mock_driver.clone_image.called)
# For a cache hit it should only clone from the image-volume
mock_create_from_src.assert_called_once_with(self.ctxt,
volume,
image_volume_id)
# The image download should not happen when we get a cache hit
self.assertFalse(mock_create_from_img_dl.called)
mock_handle_bootable.assert_called_once_with(
self.ctxt,
volume['id'],
image_id=image_id,
image_meta=image_meta
)
@mock.patch('cinder.image.image_utils.qemu_img_info')
def test_create_from_image_cache_miss(
self, mock_qemu_info, mock_get_internal_context,
mock_create_from_img_dl, mock_create_from_src,
mock_handle_bootable, mock_fetch_img):
mock_get_internal_context.return_value = self.ctxt
mock_fetch_img.return_value = mock.MagicMock(
spec=utils.get_file_spec())
image_info = imageutils.QemuImgInfo()
image_info.virtual_size = '2147483648'
mock_qemu_info.return_value = image_info
self.mock_driver.clone_image.return_value = (None, False)
self.mock_cache.get_entry.return_value = None
volume = fake_volume.fake_volume_obj(self.ctxt, size=10,
host='foo@bar#pool')
image_volume = fake_volume.fake_db_volume(size=2)
self.mock_db.volume_create.return_value = image_volume
def update_volume(ctxt, id, updates):
volume.update(updates)
return volume
self.mock_db.volume_update.side_effect = update_volume
image_location = 'someImageLocationStr'
image_id = 'c7a8b8d4-e519-46c7-a0df-ddf1b9b9fff2'
image_meta = mock.MagicMock()
manager = create_volume_manager.CreateVolumeFromSpecTask(
self.mock_volume_manager,
self.mock_db,
self.mock_driver,
image_volume_cache=self.mock_cache
)
manager._create_from_image(self.ctxt,
volume,
image_location,
image_id,
image_meta,
self.mock_image_service)
# Make sure clone_image is always called
self.assertTrue(self.mock_driver.clone_image.called)
# The image download should happen if clone fails and
# we get a cache miss
mock_create_from_img_dl.assert_called_once_with(
self.ctxt,
mock.ANY,
image_location,
image_id,
self.mock_image_service
)
# The volume size should be reduced to virtual_size and then put back
self.mock_db.volume_update.assert_any_call(self.ctxt,
volume['id'],
{'size': 2})
self.mock_db.volume_update.assert_any_call(self.ctxt,
volume['id'],
{'size': 10})
# Make sure created a new cache entry
(self.mock_volume_manager.
_create_image_cache_volume_entry.assert_called_once_with(
self.ctxt, volume, image_id, image_meta))
mock_handle_bootable.assert_called_once_with(
self.ctxt,
volume['id'],
image_id=image_id,
image_meta=image_meta
)
@mock.patch('cinder.image.image_utils.qemu_img_info')
def test_create_from_image_cache_miss_error_downloading(
self, mock_qemu_info, mock_get_internal_context,
mock_create_from_img_dl, mock_create_from_src,
mock_handle_bootable, mock_fetch_img):
mock_fetch_img.return_value = mock.MagicMock()
image_info = imageutils.QemuImgInfo()
image_info.virtual_size = '2147483648'
mock_qemu_info.return_value = image_info
self.mock_driver.clone_image.return_value = (None, False)
self.mock_cache.get_entry.return_value = None
volume = fake_volume.fake_volume_obj(self.ctxt, size=10,
host='foo@bar#pool')
image_volume = fake_volume.fake_db_volume(size=2)
self.mock_db.volume_create.return_value = image_volume
mock_create_from_img_dl.side_effect = exception.CinderException()
def update_volume(ctxt, id, updates):
volume.update(updates)
return volume
self.mock_db.volume_update.side_effect = update_volume
image_location = 'someImageLocationStr'
image_id = 'c7a8b8d4-e519-46c7-a0df-ddf1b9b9fff2'
image_meta = mock.MagicMock()
manager = create_volume_manager.CreateVolumeFromSpecTask(
self.mock_volume_manager,
self.mock_db,
self.mock_driver,
image_volume_cache=self.mock_cache
)
self.assertRaises(
exception.CinderException,
manager._create_from_image,
self.ctxt,
volume,
image_location,
image_id,
image_meta,
self.mock_image_service
)
# Make sure clone_image is always called
self.assertTrue(self.mock_driver.clone_image.called)
# The image download should happen if clone fails and
# we get a cache miss
mock_create_from_img_dl.assert_called_once_with(
self.ctxt,
mock.ANY,
image_location,
image_id,
self.mock_image_service
)
# The volume size should be reduced to virtual_size and then put back,
# especially if there is an exception while creating the volume.
self.assertEqual(2, self.mock_db.volume_update.call_count)
self.mock_db.volume_update.assert_any_call(self.ctxt,
volume['id'],
{'size': 2})
self.mock_db.volume_update.assert_any_call(self.ctxt,
volume['id'],
{'size': 10})
# Make sure we didn't try and create a cache entry
self.assertFalse(self.mock_cache.ensure_space.called)
self.assertFalse(self.mock_cache.create_cache_entry.called)
def test_create_from_image_no_internal_context(
self, mock_get_internal_context, mock_create_from_img_dl,
mock_create_from_src, mock_handle_bootable, mock_fetch_img):
self.mock_driver.clone_image.return_value = (None, False)
mock_get_internal_context.return_value = None
volume = fake_volume.fake_db_volume()
image_location = 'someImageLocationStr'
image_id = 'c7a8b8d4-e519-46c7-a0df-ddf1b9b9fff2'
image_meta = {
'properties': {
'virtual_size': '2147483648'
}
}
manager = create_volume_manager.CreateVolumeFromSpecTask(
self.mock_volume_manager,
self.mock_db,
self.mock_driver,
image_volume_cache=self.mock_cache
)
manager._create_from_image(self.ctxt,
volume,
image_location,
image_id,
image_meta,
self.mock_image_service)
# Make sure clone_image is always called
self.assertTrue(self.mock_driver.clone_image.called)
# Create from source shouldn't happen if cache cannot be used.
self.assertFalse(mock_create_from_src.called)
# The image download should happen if clone fails and we can't use the
# image-volume cache due to not having an internal context available.
mock_create_from_img_dl.assert_called_once_with(
self.ctxt,
volume,
image_location,
image_id,
self.mock_image_service
)
# This should not attempt to use a minimal size volume
self.assertFalse(self.mock_db.volume_update.called)
# Make sure we didn't try and create a cache entry
self.assertFalse(self.mock_cache.ensure_space.called)
self.assertFalse(self.mock_cache.create_cache_entry.called)
mock_handle_bootable.assert_called_once_with(
self.ctxt,
volume['id'],
image_id=image_id,
image_meta=image_meta
)
@mock.patch('cinder.image.image_utils.qemu_img_info')
def test_create_from_image_cache_miss_error_size_invalid(
self, mock_qemu_info, mock_get_internal_context,
mock_create_from_img_dl, mock_create_from_src,
mock_handle_bootable, mock_fetch_img):
mock_fetch_img.return_value = mock.MagicMock()
image_info = imageutils.QemuImgInfo()
image_info.virtual_size = '2147483648'
mock_qemu_info.return_value = image_info
self.mock_driver.clone_image.return_value = (None, False)
self.mock_cache.get_entry.return_value = None
volume = fake_volume.fake_volume_obj(self.ctxt, size=1,
host='foo@bar#pool')
image_volume = fake_volume.fake_db_volume(size=2)
self.mock_db.volume_create.return_value = image_volume
image_location = 'someImageLocationStr'
image_id = 'c7a8b8d4-e519-46c7-a0df-ddf1b9b9fff2'
image_meta = mock.MagicMock()
manager = create_volume_manager.CreateVolumeFromSpecTask(
self.mock_volume_manager,
self.mock_db,
self.mock_driver,
image_volume_cache=self.mock_cache
)
self.assertRaises(
exception.ImageUnacceptable,
manager._create_from_image,
self.ctxt,
volume,
image_location,
image_id,
image_meta,
self.mock_image_service
)
# The volume size should NOT be changed when in this case
self.assertFalse(self.mock_db.volume_update.called)
# Make sure we didn't try and create a cache entry
self.assertFalse(self.mock_cache.ensure_space.called)
self.assertFalse(self.mock_cache.create_cache_entry.called)

View File

@ -34,6 +34,7 @@ from cinder.db import base
from cinder import exception
from cinder import flow_utils
from cinder.i18n import _, _LE, _LI, _LW
from cinder.image import cache as image_cache
from cinder.image import glance
from cinder import keymgr
from cinder import objects
@ -392,6 +393,11 @@ class API(base.Base):
"snapshots.") % len(snapshots)
raise exception.InvalidVolume(reason=msg)
cache = image_cache.ImageVolumeCache(self.db, self)
entry = cache.get_by_image_volume(context, volume_id)
if entry:
cache.evict(context, entry)
# If the volume is encrypted, delete its encryption key from the key
# manager. This operation makes volume deletion an irreversible process
# because the volume cannot be decrypted without its key.
@ -1204,7 +1210,8 @@ class API(base.Base):
pass
recv_metadata = self.image_service.create(context, metadata)
recv_metadata = self.image_service.create(
context, self.image_service._translate_to_glance(metadata))
self.update(context, volume, {'status': 'uploading'})
self.volume_rpcapi.copy_volume_to_image(context,
volume,

View File

@ -249,6 +249,17 @@ volume_opts = [
'upload-to-image will be placed in the internal tenant. '
'Otherwise, the image volume is created in the current '
'context\'s tenant.'),
cfg.BoolOpt('image_volume_cache_enabled',
default=False,
help='Enable the image volume cache for this backend.'),
cfg.IntOpt('image_volume_cache_max_size_gb',
default=0,
help='Max size of the image volume cache for this backend in '
'GB. 0 => unlimited.'),
cfg.IntOpt('image_volume_cache_max_count',
default=0,
help='Max number of entries allowed in the image volume cache. '
'0 => unlimited.'),
]
# for backward compatibility

View File

@ -10,20 +10,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import math
import traceback
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from oslo_utils import units
import taskflow.engines
from taskflow.patterns import linear_flow
from taskflow.types import failure as ft
from cinder import context as cinder_context
from cinder import exception
from cinder import flow_utils
from cinder.i18n import _, _LE, _LI
from cinder.i18n import _, _LE, _LI, _LW
from cinder.image import glance
from cinder.image import image_utils
from cinder import objects
from cinder import utils
from cinder.volume.flows import common
@ -350,10 +354,12 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
default_provides = 'volume'
def __init__(self, db, driver):
def __init__(self, manager, db, driver, image_volume_cache=None):
super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION])
self.manager = manager
self.db = db
self.driver = driver
self.image_volume_cache = image_volume_cache
def _handle_bootable_volume_glance_meta(self, context, volume_id,
**kwargs):
@ -623,6 +629,59 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
{'id': image_volume['id']})
return None, False
def _create_from_image_download(self, context, volume_ref, image_location,
image_id, image_service):
# TODO(harlowja): what needs to be rolled back in the clone if this
# volume create fails?? Likely this should be a subflow or broken
# out task in the future. That will bring up the question of how
# do we make said subflow/task which is only triggered in the
# clone image 'path' resumable and revertable in the correct
# manner.
model_update = self.driver.create_volume(volume_ref)
updates = dict(model_update or dict(), status='downloading')
try:
volume_ref = self.db.volume_update(context,
volume_ref['id'], updates)
except exception.CinderException:
LOG.exception(_LE("Failed updating volume %(volume_id)s with "
"%(updates)s"),
{'volume_id': volume_ref['id'],
'updates': updates})
self._copy_image_to_volume(context, volume_ref,
image_id, image_location, image_service)
return model_update
def _create_from_image_cache(self, context, internal_context, volume_ref,
image_id, image_meta):
"""Attempt to create the volume using the image cache.
Best case this will simply clone the existing volume in the cache.
Worst case the image is out of date and will be evicted. In that case
a clone will not be created and the image must be downloaded again.
"""
LOG.debug('Attempting to retrieve cache entry for image = '
'%(image_id)s on host %(host)s.',
{'image_id': image_id, 'host': volume_ref['host']})
try:
cache_entry = self.image_volume_cache.get_entry(internal_context,
volume_ref,
image_id,
image_meta)
if cache_entry:
LOG.debug('Creating from source image-volume %(volume_id)s',
{'volume_id': cache_entry['volume_id']})
model_update = self._create_from_source_volume(
context,
volume_ref,
cache_entry['volume_id']
)
return model_update, True
except exception.CinderException as e:
LOG.warning(_LW('Failed to create volume from image-volume cache, '
'will fall back to default behavior. Error: '
'%(exception)s'), {'exception': e})
return None, False
def _create_from_image(self, context, volume_ref,
image_location, image_id, image_meta,
image_service, **kwargs):
@ -630,8 +689,11 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
" at location %(image_location)s.",
{'volume_id': volume_ref['id'],
'image_location': image_location, 'image_id': image_id})
# Create the volume from an image.
#
# First see if the driver can clone the image directly.
#
# NOTE (singn): two params need to be returned
# dict containing provider_location for cloned volume
# and clone status.
@ -640,32 +702,92 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
image_location,
image_meta,
image_service)
# Try and clone the image if we have it set as a glance location.
if not cloned and 'cinder' in CONF.allowed_direct_url_schemes:
model_update, cloned = self._clone_image_volume(context,
volume_ref,
image_location,
image_meta)
if not cloned:
# TODO(harlowja): what needs to be rolled back in the clone if this
# volume create fails?? Likely this should be a subflow or broken
# out task in the future. That will bring up the question of how
# do we make said subflow/task which is only triggered in the
# clone image 'path' resumable and revertable in the correct
# manner.
#
# Create the volume and then download the image onto the volume.
model_update = self.driver.create_volume(volume_ref)
updates = dict(model_update or dict(), status='downloading')
try:
volume_ref = self.db.volume_update(context,
volume_ref['id'], updates)
except exception.CinderException:
LOG.exception(_LE("Failed updating volume %(volume_id)s with "
"%(updates)s"),
{'volume_id': volume_ref['id'],
'updates': updates})
self._copy_image_to_volume(context, volume_ref,
image_id, image_location, image_service)
# Try and use the image cache.
should_create_cache_entry = False
internal_context = cinder_context.get_internal_tenant_context()
if not internal_context:
LOG.warning(_LW('Unable to get Cinder internal context, will '
'not use image-volume cache.'))
if not cloned and internal_context and self.image_volume_cache:
model_update, cloned = self._create_from_image_cache(
context,
internal_context,
volume_ref,
image_id,
image_meta
)
if not cloned:
should_create_cache_entry = True
# Fall back to default behavior of creating volume,
# download the image data and copy it into the volume.
original_size = volume_ref['size']
try:
if not cloned:
with image_utils.TemporaryImages.fetch(
image_service, context, image_id) as tmp_image:
# Try to create the volume as the minimal size, then we can
# extend once the image has been downloaded.
if should_create_cache_entry:
data = image_utils.qemu_img_info(tmp_image)
virtual_size = int(
math.ceil(float(data.virtual_size) / units.Gi))
if virtual_size > volume_ref.size:
params = {'image_size': virtual_size,
'volume_size': volume_ref.size}
reason = _("Image virtual size is %(image_size)dGB"
" and doesn't fit in a volume of size"
" %(volume_size)dGB.") % params
raise exception.ImageUnacceptable(
image_id=image_id, reason=reason)
if virtual_size and virtual_size != original_size:
updates = {'size': virtual_size}
volume_ref = self.db.volume_update(
context,
volume_ref['id'],
updates
)
model_update = self._create_from_image_download(
context,
volume_ref,
image_location,
image_id,
image_service
)
if should_create_cache_entry:
# Update the newly created volume db entry before we clone it
# for the image-volume creation.
if model_update:
volume_ref = self.db.volume_update(context,
volume_ref['id'],
model_update)
self.manager._create_image_cache_volume_entry(internal_context,
volume_ref,
image_id,
image_meta)
finally:
# If we created the volume as the minimal size, extend it back to
# what was originally requested. If an exception has occurred we
# still need to put this back before letting it be raised further
# up the stack.
if volume_ref['size'] != original_size:
self.driver.extend_volume(volume_ref, original_size)
updates = {'size': original_size}
self.db.volume_update(context, volume_ref['id'], updates)
self._handle_bootable_volume_glance_meta(context, volume_ref['id'],
image_id=image_id,
@ -775,9 +897,10 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
'volume_id': volume_id})
def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
def get_flow(context, manager, db, driver, scheduler_rpcapi, host, volume_id,
allow_reschedule, reschedule_context, request_spec,
filter_properties):
filter_properties, image_volume_cache=None):
"""Constructs and returns the manager entrypoint flow.
This flow will do the following:
@ -823,7 +946,10 @@ def get_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
volume_flow.add(ExtractVolumeSpecTask(db),
NotifyVolumeActionTask(db, "create.start"),
CreateVolumeFromSpecTask(db, driver),
CreateVolumeFromSpecTask(manager,
db,
driver,
image_volume_cache),
CreateVolumeOnFinishTask(db, "create.end"))
# Now load (but do not run) the flow using the provided initial data.

View File

@ -58,11 +58,13 @@ from cinder import context
from cinder import exception
from cinder import flow_utils
from cinder.i18n import _, _LE, _LI, _LW
from cinder.image import cache as image_cache
from cinder.image import glance
from cinder import manager
from cinder import objects
from cinder import quota
from cinder import utils
from cinder import volume as cinder_volume
from cinder.volume import configuration as config
from cinder.volume.flows.manager import create_volume
from cinder.volume.flows.manager import manage_existing
@ -234,6 +236,30 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.error(_LE("Invalid JSON: %s"),
self.driver.configuration.extra_capabilities)
if self.driver.configuration.safe_get(
'image_volume_cache_enabled'):
max_cache_size = self.driver.configuration.safe_get(
'image_volume_cache_max_size_gb')
max_cache_entries = self.driver.configuration.safe_get(
'image_volume_cache_max_count')
self.image_volume_cache = image_cache.ImageVolumeCache(
self.db,
cinder_volume.API(),
max_cache_size,
max_cache_entries
)
LOG.info(_LI('Image-volume cache enabled for host %(host)s'),
{'host': self.host})
else:
LOG.info(_LI('Image-volume cache disabled for host %(host)s'),
{'host': self.host})
self.image_volume_cache = None
def _add_to_threadpool(self, func, *args, **kwargs):
self._tp.spawn_n(func, *args, **kwargs)
def _count_allocated_capacity(self, ctxt, volume):
pool = vol_utils.extract_host(volume['host'], 'pool')
if pool is None:
@ -446,6 +472,7 @@ class VolumeManager(manager.SchedulerDependentManager):
# verified by the task itself.
flow_engine = create_volume.get_flow(
context_elevated,
self,
self.db,
self.driver,
self.scheduler_rpcapi,
@ -454,7 +481,9 @@ class VolumeManager(manager.SchedulerDependentManager):
allow_reschedule,
context,
request_spec,
filter_properties)
filter_properties,
image_volume_cache=self.image_volume_cache,
)
except Exception:
msg = _("Create manager volume flow failed.")
LOG.exception(msg, resource={'type': 'volume', 'id': volume_id})
@ -970,6 +999,46 @@ class VolumeManager(manager.SchedulerDependentManager):
self._notify_about_volume_usage(context, volume, "detach.end")
LOG.info(_LI("Detach volume completed successfully."), resource=volume)
def _create_image_cache_volume_entry(self, ctx, volume_ref,
image_id, image_meta):
"""Create a new image-volume and cache entry for it.
This assumes that the image has already been downloaded and stored
in the volume described by the volume_ref.
"""
image_volume = None
try:
if not self.image_volume_cache.ensure_space(
ctx,
volume_ref['size'],
volume_ref['host']):
LOG.warning(_LW('Unable to ensure space for image-volume in'
' cache. Will skip creating entry for image'
' %(image)s on host %(host)s.'),
{'image': image_id, 'host': volume_ref['host']})
return
image_volume = self._clone_image_volume(ctx,
volume_ref,
image_meta)
if not image_volume:
LOG.warning(_LW('Unable to clone image_volume for image '
'%(image_id) will not create cache entry.'),
{'image_id': image_id})
return
self.image_volume_cache.create_cache_entry(
ctx,
image_volume,
image_id,
image_meta
)
except exception.CinderException as e:
LOG.warning(_LW('Failed to create new image-volume cache entry'
' Error: %(exception)s'), {'exception': e})
if image_volume:
self.delete_volume(ctx, image_volume.id)
def _clone_image_volume(self, ctx, volume, image_meta):
volume_type_id = volume.get('volume_type_id')
reserve_opts = {'volumes': 1, 'gigabytes': volume.size}