Honor metadata_encryption_key in glance domain

Since v2 is not using the registry client, it was not properly
encrypting image locations before storing them in the database. With
this change, the db layer in the glance domain now uses the metadata
encryption key as well.

To make this work, the declaration of the metadata_encryption_key option
had to move to glance.common.config to avoid circular module
dependencies.

Fixes bug 1128650

Change-Id: I3bb5da92ffda7bfe1bc064d11da8ecd4e6d9ab1d
This commit is contained in:
Mark J. Washenberger 2013-03-06 11:26:57 -08:00
parent 47362d7ef4
commit a7effe6dc2
6 changed files with 78 additions and 5 deletions

View File

@ -64,6 +64,9 @@ common_opts = [
cfg.IntOpt('pydev_worker_debug_port', default=5678,
help=_('The port on which a pydev process is listening for '
'connections.')),
cfg.StrOpt('metadata_encryption_key', secret=True,
help=_('Key used for encrypting sensitive metadata while '
'talking to the registry or database.')),
]
CONF = cfg.CONF

View File

@ -19,6 +19,7 @@
from oslo.config import cfg
from glance.common import crypt
from glance.common import exception
import glance.domain
import glance.domain.proxy
@ -34,6 +35,7 @@ sql_connection_opt = cfg.StrOpt('sql_connection',
CONF = cfg.CONF
CONF.register_opt(sql_connection_opt)
CONF.import_opt('metadata_encryption_key', 'glance.common.config')
def add_cli_options():
@ -99,6 +101,10 @@ class ImageRepo(object):
# NOTE(markwash) db api requires us to filter deleted
if not prop['deleted']:
properties[prop['name']] = prop['value']
locations = db_image['locations']
if CONF.metadata_encryption_key:
key = CONF.metadata_encryption_key
locations = [crypt.urlsafe_decrypt(key, l) for l in locations]
return glance.domain.Image(
image_id=db_image['id'],
name=db_image['name'],
@ -109,7 +115,7 @@ class ImageRepo(object):
min_disk=db_image['min_disk'],
min_ram=db_image['min_ram'],
protected=db_image['protected'],
locations=db_image['locations'],
locations=locations,
checksum=db_image['checksum'],
owner=db_image['owner'],
disk_format=db_image['disk_format'],
@ -120,6 +126,10 @@ class ImageRepo(object):
)
def _format_image_to_db(self, image):
locations = image.locations
if CONF.metadata_encryption_key:
key = CONF.metadata_encryption_key
locations = [crypt.urlsafe_encrypt(key, l) for l in locations]
return {
'id': image.image_id,
'name': image.name,
@ -128,7 +138,7 @@ class ImageRepo(object):
'min_disk': image.min_disk,
'min_ram': image.min_ram,
'protected': image.protected,
'locations': image.locations,
'locations': locations,
'checksum': image.checksum,
'owner': image.owner,
'disk_format': image.disk_format,

View File

@ -43,7 +43,7 @@ import glance.store.swift
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('metadata_encryption_key', 'glance.registry')
CONF.import_opt('metadata_encryption_key', 'glance.common.config')
def upgrade(migrate_engine):

View File

@ -40,7 +40,6 @@ registry_client_opts = [
cfg.StrOpt('registry_client_ca_file'),
cfg.BoolOpt('registry_client_insecure', default=False),
cfg.IntOpt('registry_client_timeout', default=600),
cfg.StrOpt('metadata_encryption_key', secret=True),
]
registry_client_ctx_opts = [
cfg.StrOpt('admin_user', secret=True),
@ -55,6 +54,7 @@ CONF = cfg.CONF
CONF.register_opts(registry_addr_opts)
CONF.register_opts(registry_client_opts)
CONF.register_opts(registry_client_ctx_opts)
CONF.import_opt('metadata_encryption_key', 'glance.common.config')
_CLIENT_CREDS = None
_CLIENT_HOST = None

View File

@ -13,6 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from glance.common import crypt
from glance.common import exception
import glance.context
import glance.db
@ -21,6 +24,9 @@ import glance.tests.unit.utils as unit_test_utils
import glance.tests.utils as test_utils
CONF = cfg.CONF
CONF.import_opt('metadata_encryption_key', 'glance.common.config')
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
UUID2 = 'a85abd86-55b3-4d5b-b0b4-5d0a6e6042fc'
UUID3 = '971ec09a-8067-4bc8-a91f-ae3557f1c4c7'
@ -212,6 +218,60 @@ class TestImageRepo(test_utils.BaseTestCase):
self.assertRaises(exception.NotFound, self.image_repo.get, UUID1)
class TestEncryptedLocations(test_utils.BaseTestCase):
def setUp(self):
super(TestEncryptedLocations, self).setUp()
self.db = unit_test_utils.FakeDB()
self.db.reset()
self.context = glance.context.RequestContext(
user=USER1, tenant=TENANT1)
self.image_repo = glance.db.ImageRepo(self.context, self.db)
self.image_factory = glance.domain.ImageFactory()
self.crypt_key = '0123456789abcdef'
self.config(metadata_encryption_key=self.crypt_key)
def test_encrypt_locations_on_add(self):
image = self.image_factory.new_image(UUID1)
image.locations = ['foo', 'bar']
self.image_repo.add(image)
db_data = self.db.image_get(self.context, UUID1)
self.assertNotEqual(db_data['locations'], ['foo', 'bar'])
decrypted_locations = [crypt.urlsafe_decrypt(self.crypt_key, l)
for l in db_data['locations']]
self.assertEqual(decrypted_locations, ['foo', 'bar'])
def test_encrypt_locations_on_save(self):
image = self.image_factory.new_image(UUID1)
self.image_repo.add(image)
image.locations = ['foo', 'bar']
self.image_repo.save(image)
db_data = self.db.image_get(self.context, UUID1)
self.assertNotEqual(db_data['locations'], ['foo', 'bar'])
decrypted_locations = [crypt.urlsafe_decrypt(self.crypt_key, l)
for l in db_data['locations']]
self.assertEqual(decrypted_locations, ['foo', 'bar'])
def test_decrypt_locations_on_get(self):
encrypted_locations = [crypt.urlsafe_encrypt(self.crypt_key, l)
for l in ['ping', 'pong']]
self.assertNotEqual(encrypted_locations, ['ping', 'pong'])
db_data = _db_fixture(UUID1, owner=TENANT1,
locations=encrypted_locations)
self.db.image_create(None, db_data)
image = self.image_repo.get(UUID1)
self.assertEqual(image.locations, ['ping', 'pong'])
def test_decrypt_locations_on_list(self):
encrypted_locations = [crypt.urlsafe_encrypt(self.crypt_key, l)
for l in ['ping', 'pong']]
self.assertNotEqual(encrypted_locations, ['ping', 'pong'])
db_data = _db_fixture(UUID1, owner=TENANT1,
locations=encrypted_locations)
self.db.image_create(None, db_data)
image = self.image_repo.list()[0]
self.assertEqual(image.locations, ['ping', 'pong'])
class TestImageMemberRepo(test_utils.BaseTestCase):
def setUp(self):

View File

@ -47,7 +47,7 @@ from glance.tests import utils
CONF = cfg.CONF
CONF.import_opt('metadata_encryption_key', 'glance.registry')
CONF.import_opt('metadata_encryption_key', 'glance.common.config')
LOG = logging.getLogger(__name__)