Unit/Functional tests for multi store support

Added some unit tests for coverage purpose.
Added functional tests for create and import scenarios.

Note:
For functional tests I have considered file store with two
different image directories.

Related to blueprint multi-store
Change-Id: I59e28ab822fb5f6940f48ddbf6dfba4cb7d4c509
This commit is contained in:
Abhishek Kekane 2018-06-22 16:07:31 +00:00
parent cb45edf5c8
commit 73109de485
6 changed files with 1817 additions and 0 deletions

View File

@ -445,6 +445,191 @@ allowed_origin=http://valid.example.com
"""
class ApiServerForMultipleBackend(Server):
"""
Server object that starts/stops/manages the API server
"""
def __init__(self, test_dir, port, policy_file, delayed_delete=False,
pid_file=None, sock=None, **kwargs):
super(ApiServerForMultipleBackend, self).__init__(
test_dir, port, sock=sock)
self.server_name = 'api'
self.server_module = 'glance.cmd.%s' % self.server_name
self.default_backend = kwargs.get("default_backend", "file1")
self.bind_host = "127.0.0.1"
self.registry_host = "127.0.0.1"
self.key_file = ""
self.cert_file = ""
self.metadata_encryption_key = "012345678901234567890123456789ab"
self.image_dir_backend_1 = os.path.join(self.test_dir, "images_1")
self.image_dir_backend_2 = os.path.join(self.test_dir, "images_2")
self.pid_file = pid_file or os.path.join(self.test_dir,
"multiple_backend_api.pid")
self.log_file = os.path.join(self.test_dir, "multiple_backend_api.log")
self.image_size_cap = 1099511627776
self.delayed_delete = delayed_delete
self.owner_is_tenant = True
self.workers = 0
self.scrub_time = 5
self.image_cache_dir = os.path.join(self.test_dir,
'cache')
self.image_cache_driver = 'sqlite'
self.policy_file = policy_file
self.policy_default_rule = 'default'
self.property_protection_rule_format = 'roles'
self.image_member_quota = 10
self.image_property_quota = 10
self.image_tag_quota = 10
self.image_location_quota = 2
self.disable_path = None
self.needs_database = True
default_sql_connection = 'sqlite:////%s/tests.sqlite' % self.test_dir
self.sql_connection = os.environ.get('GLANCE_TEST_SQL_CONNECTION',
default_sql_connection)
self.data_api = kwargs.get("data_api",
"glance.db.sqlalchemy.api")
self.user_storage_quota = '0'
self.lock_path = self.test_dir
self.location_strategy = 'location_order'
self.store_type_location_strategy_preference = ""
self.send_identity_headers = False
self.conf_base = """[DEFAULT]
debug = %(debug)s
default_log_levels = eventlet.wsgi.server=DEBUG
bind_host = %(bind_host)s
bind_port = %(bind_port)s
key_file = %(key_file)s
cert_file = %(cert_file)s
metadata_encryption_key = %(metadata_encryption_key)s
registry_host = %(registry_host)s
registry_port = %(registry_port)s
use_user_token = %(use_user_token)s
send_identity_credentials = %(send_identity_credentials)s
log_file = %(log_file)s
image_size_cap = %(image_size_cap)d
delayed_delete = %(delayed_delete)s
owner_is_tenant = %(owner_is_tenant)s
workers = %(workers)s
scrub_time = %(scrub_time)s
send_identity_headers = %(send_identity_headers)s
image_cache_dir = %(image_cache_dir)s
image_cache_driver = %(image_cache_driver)s
data_api = %(data_api)s
sql_connection = %(sql_connection)s
show_image_direct_url = %(show_image_direct_url)s
show_multiple_locations = %(show_multiple_locations)s
user_storage_quota = %(user_storage_quota)s
enable_v2_api = %(enable_v2_api)s
lock_path = %(lock_path)s
property_protection_file = %(property_protection_file)s
property_protection_rule_format = %(property_protection_rule_format)s
image_member_quota=%(image_member_quota)s
image_property_quota=%(image_property_quota)s
image_tag_quota=%(image_tag_quota)s
image_location_quota=%(image_location_quota)s
location_strategy=%(location_strategy)s
allow_additional_image_properties = True
enabled_backends=file1:file, file2:file
[oslo_policy]
policy_file = %(policy_file)s
policy_default_rule = %(policy_default_rule)s
[paste_deploy]
flavor = %(deployment_flavor)s
[store_type_location_strategy]
store_type_preference = %(store_type_location_strategy_preference)s
[glance_store]
default_backend = %(default_backend)s
[file1]
filesystem_store_datadir=%(image_dir_backend_1)s
[file2]
filesystem_store_datadir=%(image_dir_backend_2)s
"""
self.paste_conf_base = """[pipeline:glance-api]
pipeline =
cors
healthcheck
versionnegotiation
gzip
unauthenticated-context
rootapp
[pipeline:glance-api-caching]
pipeline = cors healthcheck versionnegotiation gzip unauthenticated-context
cache rootapp
[pipeline:glance-api-cachemanagement]
pipeline =
cors
healthcheck
versionnegotiation
gzip
unauthenticated-context
cache
cache_manage
rootapp
[pipeline:glance-api-fakeauth]
pipeline = cors healthcheck versionnegotiation gzip fakeauth context rootapp
[pipeline:glance-api-noauth]
pipeline = cors healthcheck versionnegotiation gzip context rootapp
[composite:rootapp]
paste.composite_factory = glance.api:root_app_factory
/: apiversions
/v1: apiv1app
/v2: apiv2app
[app:apiversions]
paste.app_factory = glance.api.versions:create_resource
[app:apiv1app]
paste.app_factory = glance.api.v1.router:API.factory
[app:apiv2app]
paste.app_factory = glance.api.v2.router:API.factory
[filter:healthcheck]
paste.filter_factory = oslo_middleware:Healthcheck.factory
backends = disable_by_file
disable_by_file_path = %(disable_path)s
[filter:versionnegotiation]
paste.filter_factory =
glance.api.middleware.version_negotiation:VersionNegotiationFilter.factory
[filter:gzip]
paste.filter_factory = glance.api.middleware.gzip:GzipMiddleware.factory
[filter:cache]
paste.filter_factory = glance.api.middleware.cache:CacheFilter.factory
[filter:cache_manage]
paste.filter_factory =
glance.api.middleware.cache_manage:CacheManageFilter.factory
[filter:context]
paste.filter_factory = glance.api.middleware.context:ContextMiddleware.factory
[filter:unauthenticated-context]
paste.filter_factory =
glance.api.middleware.context:UnauthenticatedContextMiddleware.factory
[filter:fakeauth]
paste.filter_factory = glance.tests.utils:FakeAuthMiddleware.factory
[filter:cors]
paste.filter_factory = oslo_middleware.cors:filter_factory
allowed_origin=http://valid.example.com
"""
class RegistryServer(Server):
"""
@ -946,3 +1131,374 @@ class FunctionalTest(test_utils.BaseTestCase):
self._attached_server_logs.append(s.log_file)
self.addDetail(
s.server_name, testtools.content.text_content(s.dump_log()))
class MultipleBackendFunctionalTest(test_utils.BaseTestCase):
"""
Base test class for any test that wants to test the actual
servers and clients and not just the stubbed out interfaces
"""
inited = False
disabled = False
launched_servers = []
def setUp(self):
super(MultipleBackendFunctionalTest, self).setUp()
self.test_dir = self.useFixture(fixtures.TempDir()).path
self.api_protocol = 'http'
self.api_port, api_sock = test_utils.get_unused_port_and_socket()
self.registry_port, reg_sock = test_utils.get_unused_port_and_socket()
# NOTE: Scrubber is enabled by default for the functional tests.
# Please disbale it by explicitly setting 'self.include_scrubber' to
# False in the test SetUps that do not require Scrubber to run.
self.include_scrubber = True
self.tracecmd = tracecmd_osmap.get(platform.system())
conf_dir = os.path.join(self.test_dir, 'etc')
utils.safe_mkdirs(conf_dir)
self.copy_data_file('schema-image.json', conf_dir)
self.copy_data_file('policy.json', conf_dir)
self.copy_data_file('property-protections.conf', conf_dir)
self.copy_data_file('property-protections-policies.conf', conf_dir)
self.property_file_roles = os.path.join(conf_dir,
'property-protections.conf')
property_policies = 'property-protections-policies.conf'
self.property_file_policies = os.path.join(conf_dir,
property_policies)
self.policy_file = os.path.join(conf_dir, 'policy.json')
self.api_server_multiple_backend = ApiServerForMultipleBackend(
self.test_dir, self.api_port, self.policy_file, sock=api_sock)
self.registry_server = RegistryServer(self.test_dir,
self.registry_port,
self.policy_file,
sock=reg_sock)
self.scrubber_daemon = ScrubberDaemon(self.test_dir, self.policy_file)
self.pid_files = [self.api_server_multiple_backend.pid_file,
self.registry_server.pid_file,
self.scrubber_daemon.pid_file]
self.files_to_destroy = []
self.launched_servers = []
# Keep track of servers we've logged so we don't double-log them.
self._attached_server_logs = []
self.addOnException(self.add_log_details_on_exception)
if not self.disabled:
# We destroy the test data store between each test case,
# and recreate it, which ensures that we have no side-effects
# from the tests
self.addCleanup(
self._reset_database, self.registry_server.sql_connection)
self.addCleanup(
self._reset_database,
self.api_server_multiple_backend.sql_connection)
self.addCleanup(self.cleanup)
self._reset_database(self.registry_server.sql_connection)
self._reset_database(
self.api_server_multiple_backend.sql_connection)
def set_policy_rules(self, rules):
fap = open(self.policy_file, 'w')
fap.write(jsonutils.dumps(rules))
fap.close()
def _reset_database(self, conn_string):
conn_pieces = urlparse.urlparse(conn_string)
if conn_string.startswith('sqlite'):
# We leave behind the sqlite DB for failing tests to aid
# in diagnosis, as the file size is relatively small and
# won't interfere with subsequent tests as it's in a per-
# test directory (which is blown-away if the test is green)
pass
elif conn_string.startswith('mysql'):
# We can execute the MySQL client to destroy and re-create
# the MYSQL database, which is easier and less error-prone
# than using SQLAlchemy to do this via MetaData...trust me.
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
if auth_pieces[1].strip():
password = "-p%s" % auth_pieces[1]
sql = ("drop database if exists %(database)s; "
"create database %(database)s;") % {'database': database}
cmd = ("mysql -u%(user)s %(password)s -h%(host)s "
"-e\"%(sql)s\"") % {'user': user, 'password': password,
'host': host, 'sql': sql}
exitcode, out, err = execute(cmd)
self.assertEqual(0, exitcode)
def cleanup(self):
"""
Makes sure anything we created or started up in the
tests are destroyed or spun down
"""
# NOTE(jbresnah) call stop on each of the servers instead of
# checking the pid file. stop() will wait until the child
# server is dead. This eliminates the possibility of a race
# between a child process listening on a port actually dying
# and a new process being started
servers = [self.api_server_multiple_backend,
self.registry_server,
self.scrubber_daemon]
for s in servers:
try:
s.stop()
except Exception:
pass
for f in self.files_to_destroy:
if os.path.exists(f):
os.unlink(f)
def start_server(self,
server,
expect_launch,
expect_exit=True,
expected_exitcode=0,
**kwargs):
"""
Starts a server on an unused port.
Any kwargs passed to this method will override the configuration
value in the conf file used in starting the server.
:param server: the server to launch
:param expect_launch: true iff the server is expected to
successfully start
:param expect_exit: true iff the launched process is expected
to exit in a timely fashion
:param expected_exitcode: expected exitcode from the launcher
"""
self.cleanup()
# Start up the requested server
exitcode, out, err = server.start(expect_exit=expect_exit,
expected_exitcode=expected_exitcode,
**kwargs)
if expect_exit:
self.assertEqual(expected_exitcode, exitcode,
"Failed to spin up the requested server. "
"Got: %s" % err)
self.launched_servers.append(server)
launch_msg = self.wait_for_servers([server], expect_launch)
self.assertTrue(launch_msg is None, launch_msg)
def start_with_retry(self, server, port_name, max_retries,
expect_launch=True,
**kwargs):
"""
Starts a server, with retries if the server launches but
fails to start listening on the expected port.
:param server: the server to launch
:param port_name: the name of the port attribute
:param max_retries: the maximum number of attempts
:param expect_launch: true iff the server is expected to
successfully start
:param expect_exit: true iff the launched process is expected
to exit in a timely fashion
"""
launch_msg = None
for i in range(max_retries):
exitcode, out, err = server.start(expect_exit=not expect_launch,
**kwargs)
name = server.server_name
self.assertEqual(0, exitcode,
"Failed to spin up the %s server. "
"Got: %s" % (name, err))
launch_msg = self.wait_for_servers([server], expect_launch)
if launch_msg:
server.stop()
server.bind_port = get_unused_port()
setattr(self, port_name, server.bind_port)
else:
self.launched_servers.append(server)
break
self.assertTrue(launch_msg is None, launch_msg)
def start_servers(self, **kwargs):
"""
Starts the API and Registry servers (glance-control api start
& glance-control registry start) on unused ports. glance-control
should be installed into the python path
Any kwargs passed to this method will override the configuration
value in the conf file used in starting the servers.
"""
self.cleanup()
# Start up the API and default registry server
# We start the registry server first, as the API server config
# depends on the registry port - this ordering allows for
# retrying the launch on a port clash
self.start_with_retry(self.registry_server, 'registry_port', 3,
**kwargs)
kwargs['registry_port'] = self.registry_server.bind_port
self.start_with_retry(self.api_server_multiple_backend,
'api_port', 3, **kwargs)
if self.include_scrubber:
exitcode, out, err = self.scrubber_daemon.start(**kwargs)
self.assertEqual(0, exitcode,
"Failed to spin up the Scrubber daemon. "
"Got: %s" % err)
def ping_server(self, port):
"""
Simple ping on the port. If responsive, return True, else
return False.
:note We use raw sockets, not ping here, since ping uses ICMP and
has no concept of ports...
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect(("127.0.0.1", port))
return True
except socket.error:
return False
finally:
s.close()
def ping_server_ipv6(self, port):
"""
Simple ping on the port. If responsive, return True, else
return False.
:note We use raw sockets, not ping here, since ping uses ICMP and
has no concept of ports...
The function uses IPv6 (therefore AF_INET6 and ::1).
"""
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
try:
s.connect(("::1", port))
return True
except socket.error:
return False
finally:
s.close()
def wait_for_servers(self, servers, expect_launch=True, timeout=30):
"""
Tight loop, waiting for the given server port(s) to be available.
Returns when all are pingable. There is a timeout on waiting
for the servers to come up.
:param servers: Glance server ports to ping
:param expect_launch: Optional, true iff the server(s) are
expected to successfully start
:param timeout: Optional, defaults to 30 seconds
:returns: None if launch expectation is met, otherwise an
assertion message
"""
now = datetime.datetime.now()
timeout_time = now + datetime.timedelta(seconds=timeout)
replied = []
while (timeout_time > now):
pinged = 0
for server in servers:
if self.ping_server(server.bind_port):
pinged += 1
if server not in replied:
replied.append(server)
if pinged == len(servers):
msg = 'Unexpected server launch status'
return None if expect_launch else msg
now = datetime.datetime.now()
time.sleep(0.05)
failed = list(set(servers) - set(replied))
msg = 'Unexpected server launch status for: '
for f in failed:
msg += ('%s, ' % f.server_name)
if os.path.exists(f.pid_file):
pid = f.process_pid
trace = f.pid_file.replace('.pid', '.trace')
if self.tracecmd:
cmd = '%s -p %d -o %s' % (self.tracecmd, pid, trace)
try:
execute(cmd, raise_error=False, expect_exit=False)
except OSError as e:
if e.errno == errno.ENOENT:
raise RuntimeError('No executable found for "%s" '
'command.' % self.tracecmd)
else:
raise
time.sleep(0.5)
if os.path.exists(trace):
msg += ('\n%s:\n%s\n' % (self.tracecmd,
open(trace).read()))
self.add_log_details(failed)
return msg if expect_launch else None
def stop_server(self, server):
"""
Called to stop a single server in a normal fashion using the
glance-control stop method to gracefully shut the server down.
:param server: the server to stop
"""
# Spin down the requested server
server.stop()
def stop_servers(self):
"""
Called to stop the started servers in a normal fashion. Note
that cleanup() will stop the servers using a fairly draconian
method of sending a SIGTERM signal to the servers. Here, we use
the glance-control stop method to gracefully shut the server down.
This method also asserts that the shutdown was clean, and so it
is meant to be called during a normal test case sequence.
"""
# Spin down the API and default registry server
self.stop_server(self.api_server_multiple_backend)
self.stop_server(self.registry_server)
if self.include_scrubber:
self.stop_server(self.scrubber_daemon)
self._reset_database(self.registry_server.sql_connection)
def run_sql_cmd(self, sql):
"""
Provides a crude mechanism to run manual SQL commands for backend
DB verification within the functional tests.
The raw result set is returned.
"""
engine = db_api.get_engine()
return engine.execute(sql)
def copy_data_file(self, file_name, dst_dir):
src_file_name = os.path.join('glance/tests/etc', file_name)
shutil.copy(src_file_name, dst_dir)
dst_file_name = os.path.join(dst_dir, file_name)
return dst_file_name
def add_log_details_on_exception(self, *args, **kwargs):
self.add_log_details()
def add_log_details(self, servers=None):
for s in servers or self.launched_servers:
if s.log_file not in self._attached_server_logs:
self._attached_server_logs.append(s.log_file)
self.addDetail(
s.server_name, testtools.content.text_content(s.dump_log()))

File diff suppressed because it is too large Load Diff

View File

@ -54,6 +54,34 @@ class StoreClearingUnitTest(test_utils.BaseTestCase):
store.create_stores(CONF)
class MultiStoreClearingUnitTest(test_utils.BaseTestCase):
def setUp(self):
super(MultiStoreClearingUnitTest, self).setUp()
# Ensure stores + locations cleared
location.SCHEME_TO_CLS_BACKEND_MAP = {}
self._create_multi_stores()
self.addCleanup(setattr, location, 'SCHEME_TO_CLS_MAP', dict())
def _create_multi_stores(self, passing_config=True):
"""Create known stores. Mock out sheepdog's subprocess dependency
on collie.
:param passing_config: making store driver passes basic configurations.
:returns: the number of how many store drivers been loaded.
"""
self.config(enabled_backends={'file1': 'file', 'ceph1': 'rbd'})
store.register_store_opts(CONF)
self.config(default_backend='file1',
group='glance_store')
self.config(filesystem_store_datadir=self.test_dir,
group='file1')
store.create_multi_stores(CONF)
class IsolatedUnitTest(StoreClearingUnitTest):
"""
@ -82,3 +110,27 @@ class IsolatedUnitTest(StoreClearingUnitTest):
fap = open(CONF.oslo_policy.policy_file, 'w')
fap.write(jsonutils.dumps(rules))
fap.close()
class MultiIsolatedUnitTest(MultiStoreClearingUnitTest):
"""
Unit test case that establishes a mock environment within
a testing directory (in isolation)
"""
registry = None
def setUp(self):
super(MultiIsolatedUnitTest, self).setUp()
options.set_defaults(CONF, connection='sqlite://')
lockutils.set_defaults(os.path.join(self.test_dir))
self.config(debug=False)
stubs.stub_out_registry_and_store_server(self.stubs,
self.test_dir,
registry=self.registry)
def set_policy_rules(self, rules):
fap = open(CONF.oslo_policy.policy_file, 'w')
fap.write(jsonutils.dumps(rules))
fap.close()

View File

@ -0,0 +1,48 @@
# Copyright (c) 2018-2019 RedHat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
import webob.exc
import glance.api.v2.discovery
from glance.tests.unit import base
import glance.tests.unit.utils as unit_test_utils
CONF = cfg.CONF
class TestInfoControllers(base.MultiStoreClearingUnitTest):
def setUp(self):
super(TestInfoControllers, self).setUp()
self.controller = glance.api.v2.discovery.InfoController()
def tearDown(self):
super(TestInfoControllers, self).tearDown()
def test_get_stores_with_enabled_backends_empty(self):
self.config(enabled_backends={})
req = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.get_stores,
req)
def test_get_stores(self):
available_stores = ['ceph1', 'file1']
req = unit_test_utils.get_fake_request()
output = self.controller.get_stores(req)
self.assertIn('stores', output)
for stores in output['stores']:
self.assertIn('id', stores)
self.assertIn(stores['id'], available_stores)

View File

@ -993,3 +993,35 @@ class TestImageDataSerializer(test_utils.BaseTestCase):
self.serializer.stage(response, {})
self.assertEqual(http.NO_CONTENT, response.status_int)
self.assertEqual('0', response.headers['Content-Length'])
class TestMultiBackendImagesController(base.MultiStoreClearingUnitTest):
def setUp(self):
super(TestMultiBackendImagesController, self).setUp()
self.config(debug=True)
self.image_repo = FakeImageRepo()
db = unit_test_utils.FakeDB()
policy = unit_test_utils.FakePolicyEnforcer()
notifier = unit_test_utils.FakeNotifier()
store = unit_test_utils.FakeStoreAPI()
self.controller = glance.api.v2.image_data.ImageDataController()
self.controller.gateway = FakeGateway(db, store, notifier, policy,
self.image_repo)
def test_upload(self):
request = unit_test_utils.get_fake_request()
image = FakeImage('abcd')
self.image_repo.result = image
self.controller.upload(request, unit_test_utils.UUID2, 'YYYY', 4)
self.assertEqual('YYYY', image.data)
self.assertEqual(4, image.size)
def test_upload_invalid_backend_in_request_header(self):
request = unit_test_utils.get_fake_request()
request.headers['x-image-meta-store'] = 'dummy'
image = FakeImage('abcd')
self.image_repo.result = image
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.upload,
request, unit_test_utils.UUID2, 'YYYY', 4)

View File

@ -4267,3 +4267,112 @@ class TestImageSchemaDeterminePropertyBasis(test_utils.BaseTestCase):
def test_base_property_marked_as_base(self):
schema = glance.api.v2.images.get_schema()
self.assertTrue(schema.properties['disk_format'].get('is_base', True))
class TestMultiImagesController(base.MultiIsolatedUnitTest):
def setUp(self):
super(TestMultiImagesController, self).setUp()
self.db = unit_test_utils.FakeDB(initialize=False)
self.policy = unit_test_utils.FakePolicyEnforcer()
self.notifier = unit_test_utils.FakeNotifier()
self.store = store
self._create_images()
self._create_image_members()
self.controller = glance.api.v2.images.ImagesController(self.db,
self.policy,
self.notifier,
self.store)
def _create_images(self):
self.images = [
_db_fixture(UUID1, owner=TENANT1, checksum=CHKSUM,
name='1', size=256, virtual_size=1024,
visibility='public',
locations=[{'url': '%s/%s' % (BASE_URI, UUID1),
'metadata': {}, 'status': 'active'}],
disk_format='raw',
container_format='bare',
status='active'),
_db_fixture(UUID2, owner=TENANT1, checksum=CHKSUM1,
name='2', size=512, virtual_size=2048,
visibility='public',
disk_format='raw',
container_format='bare',
status='active',
tags=['redhat', '64bit', 'power'],
properties={'hypervisor_type': 'kvm', 'foo': 'bar',
'bar': 'foo'}),
_db_fixture(UUID3, owner=TENANT3, checksum=CHKSUM1,
name='3', size=512, virtual_size=2048,
visibility='public', tags=['windows', '64bit', 'x86']),
_db_fixture(UUID4, owner=TENANT4, name='4',
size=1024, virtual_size=3072),
]
[self.db.image_create(None, image) for image in self.images]
self.db.image_tag_set_all(None, UUID1, ['ping', 'pong'])
def _create_image_members(self):
self.image_members = [
_db_image_member_fixture(UUID4, TENANT2),
_db_image_member_fixture(UUID4, TENANT3,
status='accepted'),
]
[self.db.image_member_create(None, image_member)
for image_member in self.image_members]
def test_image_import_image_not_exist(self):
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.import_image,
request, 'invalid_image',
{'method': {'name': 'glance-direct'}})
def test_image_import_with_active_image(self):
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPConflict,
self.controller.import_image,
request, UUID1,
{'method': {'name': 'glance-direct'}})
def test_image_import_invalid_backend_in_request_header(self):
request = unit_test_utils.get_fake_request()
request.headers['x-image-meta-store'] = 'dummy'
with mock.patch.object(
glance.api.authorization.ImageRepoProxy, 'get') as mock_get:
mock_get.return_value = FakeImage(status='uploading')
self.assertRaises(webob.exc.HTTPConflict,
self.controller.import_image,
request, UUID4,
{'method': {'name': 'glance-direct'}})
def test_image_import_raises_conflict_if_disk_format_is_none(self):
request = unit_test_utils.get_fake_request()
with mock.patch.object(
glance.api.authorization.ImageRepoProxy, 'get') as mock_get:
mock_get.return_value = FakeImage(disk_format=None)
self.assertRaises(webob.exc.HTTPConflict,
self.controller.import_image, request, UUID4,
{'method': {'name': 'glance-direct'}})
def test_image_import_raises_conflict(self):
request = unit_test_utils.get_fake_request()
with mock.patch.object(
glance.api.authorization.ImageRepoProxy, 'get') as mock_get:
mock_get.return_value = FakeImage(status='queued')
self.assertRaises(webob.exc.HTTPConflict,
self.controller.import_image, request, UUID4,
{'method': {'name': 'glance-direct'}})
def test_image_import_raises_conflict_for_web_download(self):
request = unit_test_utils.get_fake_request()
with mock.patch.object(
glance.api.authorization.ImageRepoProxy, 'get') as mock_get:
mock_get.return_value = FakeImage()
self.assertRaises(webob.exc.HTTPConflict,
self.controller.import_image, request, UUID4,
{'method': {'name': 'web-download'}})