diff --git a/glare/common/store_api.py b/glare/common/store_api.py index bfa5179..aca6103 100644 --- a/glare/common/store_api.py +++ b/glare/common/store_api.py @@ -90,6 +90,43 @@ def save_blob_to_store(blob_id, blob, context, max_size, return location, data.bytes_read, checksums +@utils.error_handler(error_map) +def save_blobs_to_store(blobs, context, max_size, + store_type=None, verifier=None): + """Save several files to specified store. + + :param store_type: type of the store, None means save to default store. + :param blobs: list of tuples (blob_data_id, data) + :param context: user context + :param verifier:signature verified + :return: dict {blob_data_id: (location_uri, size, checksums)} + """ + # wrap data in CooperativeReader + blobs = [(blob_data_id, + utils.LimitingReader(utils.CooperativeReader(data), max_size)) + for (blob_data_id, data) in blobs] + + if store_type == 'database': + locations = database_api.add_to_backend_batch(blobs, context, verifier) + else: + locations = [] + for blob_data_id, data in blobs: + (location, __, __, __) = backend.add_to_backend( + CONF, blob_data_id, data, 0, store_type, context, verifier) + locations.append(location) + + # combine location, size and checksums together + res = {} + for i in range(len(locations)): + data = blobs[i][1] + checksums = {"md5": data.md5.hexdigest(), + "sha1": data.sha1.hexdigest(), + "sha256": data.sha256.hexdigest()} + res[blobs[i][0]] = (locations[i], data.bytes_read, checksums) + + return res + + @utils.error_handler(error_map) def load_from_store(uri, context): """Load file from store backend. diff --git a/glare/db/sqlalchemy/api.py b/glare/db/sqlalchemy/api.py index f4e08db..250f892 100644 --- a/glare/db/sqlalchemy/api.py +++ b/glare/db/sqlalchemy/api.py @@ -706,6 +706,27 @@ def save_blob_data(context, blob_data_id, data, session): return "sql://" + blob_data.id +@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, + stop_max_attempt_number=50) +def save_blob_data_batch(context, blobs, session): + """Perform batch uploading to database.""" + with session.begin(): + + locations = [] + + # blobs is a list of tuples (blob_data_id, data) + for blob_data_id, data in blobs: + blob_data = models.ArtifactBlobData() + blob_data.id = blob_data_id + blob_data.data = data.read() + session.add(blob_data) + locations.append("sql://" + blob_data.id) + + session.flush() + + return locations + + @retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500, stop_max_attempt_number=50) def get_blob_data(context, uri, session): diff --git a/glare/objects/meta/file_utils.py b/glare/objects/meta/file_utils.py index 7c9a417..a60dfce 100644 --- a/glare/objects/meta/file_utils.py +++ b/glare/objects/meta/file_utils.py @@ -15,7 +15,6 @@ """Contains additional file utils that may be useful for upload hooks.""" -import io import os import tempfile import zipfile @@ -25,10 +24,8 @@ from oslo_log import log as logging from oslo_utils import excutils from oslo_utils import uuidutils -from glare.common import exception from glare.common import store_api from glare.common import utils -from glare.i18n import _ from glare.objects.meta import fields as glare_fields CONF = cfg.CONF @@ -65,6 +62,57 @@ def extract_zip_to_temporary_folder(tfile): return tdir +def unpack_zip_archive_to_artifact_folder(context, af, zip_ref, folder_name): + """Unpack zip archive to artifact folder. + + :param context: user context + :param af: artifact object + :param zip_ref: zip archive to be extracted + :param folder_name: name of the artifact folder where to extract data + """ + file_dict = {} + blobs = [] + for name in zip_ref.namelist(): + if not name.endswith('/'): + blob_id = uuidutils.generate_uuid() + # create an an empty blob instance in db with 'saving' status + blob = {'url': None, 'size': None, 'md5': None, 'sha1': None, + 'sha256': None, 'status': 'saving', 'id': blob_id, + 'external': False, + 'content_type': 'application/octet-stream'} + file_dict[name] = blob + blobs.append((blob_id, utils.BlobIterator(zip_ref.read(name)))) + + af = af.update_blob(context, af.id, folder_name, file_dict) + + default_store = getattr( + CONF, 'artifact_type:' + af.get_type_name()).default_store + # use global parameter if default store isn't set per artifact type + if default_store is None: + default_store = CONF.glance_store.default_store + + # try to perform blob uploading to storage backend + try: + blobs_info = store_api.save_blobs_to_store( + blobs, context, af.get_max_blob_size(folder_name), + default_store) + for name in zip_ref.namelist(): + if not name.endswith('/'): + location_uri, size, checksums = blobs_info[ + file_dict[name]['id']] + # update blob info and activate it + file_dict[name].update({'url': location_uri, + 'status': 'active', + 'size': size}) + file_dict[name].update(checksums) + except Exception: + # if upload failed remove blob from db and storage + with excutils.save_and_reraise_exception(logger=LOG): + af.update_blob(context, af.id, folder_name, None) + + af.update_blob(context, af.id, folder_name, file_dict) + + def upload_content_file(context, af, data, blob_dict, key_name, content_type='application/octet-stream'): """Upload a file to a blob dictionary. @@ -109,31 +157,3 @@ def upload_content_file(context, af, data, blob_dict, key_name, blob.update(checksums) getattr(af, blob_dict)[key_name] = blob af.update_blob(context, af.id, blob_dict, getattr(af, blob_dict)) - - -def unpack_zip_archive_in_memory(context, af, field_name, fd): - """Unpack zip archive in memory and write its content to artifact folder. - - :param context: user context - :param af: artifact object - :param field_name: blob dict name where to unpack the data - :param fd: zip archive - :return: io.BytesIO object - simple stream of in-memory bytes - """ - flobj = io.BytesIO(fd.read(INMEMORY_OBJECT_SIZE_LIMIT)) - - # Raise exception if something left - data = fd.read(1) - if data: - msg = _("The zip you are trying to unpack is too big. " - "The system upper limit is %s") % INMEMORY_OBJECT_SIZE_LIMIT - raise exception.RequestEntityTooLarge(msg) - - zip_ref = zipfile.ZipFile(flobj, 'r') - for name in zip_ref.namelist(): - if not name.endswith('/'): - upload_content_file( - context, af, utils.BlobIterator(zip_ref.read(name)), - field_name, name) - flobj.seek(0) - return flobj diff --git a/glare/store/database.py b/glare/store/database.py index 9edbd5f..6fe4b65 100644 --- a/glare/store/database.py +++ b/glare/store/database.py @@ -24,6 +24,10 @@ class DatabaseStoreAPI(base_api.BaseStoreAPI): session = db_api.get_session() return db_api.save_blob_data(context, blob_id, data, session) + def add_to_backend_batch(self, blobs, context, verifier=None): + session = db_api.get_session() + return db_api.save_blob_data_batch(context, blobs, session) + def get_from_store(self, uri, context): session = db_api.get_session() return db_api.get_blob_data(context, uri, session) diff --git a/glare/tests/unit/test_unpacking.py b/glare/tests/unit/test_unpacking.py index b70569e..c15b5c1 100644 --- a/glare/tests/unit/test_unpacking.py +++ b/glare/tests/unit/test_unpacking.py @@ -13,6 +13,7 @@ # limitations under the License. import os +from time import time from glare.tests.unit import base @@ -42,3 +43,38 @@ class TestArtifactHooks(base.BaseTestArtifactAPI): self.assertEqual(11, artifact['content']['folder1/bbb.txt']['size']) self.assertEqual( 11, artifact['content']['folder1/folder2/ccc.txt']['size']) + + def test_unpacking_database(self): + self.config(default_store='database', + group='artifact_type:unpacking_artifact') + self.test_unpacking() + + def test_unpacking_big_archive(self): + var_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + '../', 'var')) + data_path = os.path.join(var_dir, 'hooks_100.zip') + + # play rally - test that this test should pass faster than 3 seconds + start = time() + with open(data_path, "rb") as data: + self.controller.upload_blob( + self.req, 'unpacking_artifact', self.unpacking_artifact['id'], + 'zip', data, 'application/octet-stream') + end = time() + self.assertIs(True, (end - start) < 3, (end - start)) + + artifact = self.controller.show(self.req, 'unpacking_artifact', + self.unpacking_artifact['id']) + self.assertEqual(15702, artifact['zip']['size']) + self.assertEqual('active', artifact['zip']['status']) + + self.assertEqual(100, len(artifact['content'])) + + for blob in artifact['content'].values(): + self.assertEqual('active', blob['status']) + self.assertEqual(15, blob['size']) + + def test_unpacking_database_big_archive(self): + self.config(default_store='database', + group='artifact_type:unpacking_artifact') + self.test_unpacking_big_archive() diff --git a/glare/tests/unpacking_artifact.py b/glare/tests/unpacking_artifact.py index 2de5847..44a5df6 100644 --- a/glare/tests/unpacking_artifact.py +++ b/glare/tests/unpacking_artifact.py @@ -16,7 +16,6 @@ import io import zipfile from glare.common import exception -from glare.common import utils from glare.objects import base from glare.objects.meta import file_utils from glare.objects.meta import wrappers @@ -49,10 +48,9 @@ class Unpacker(base.BaseArtifact): raise exception.RequestEntityTooLarge(msg) zip_ref = zipfile.ZipFile(flobj, 'r') - for name in zip_ref.namelist(): - if not name.endswith('/'): - file_utils.upload_content_file( - context, af, utils.BlobIterator(zip_ref.read(name)), - 'content', name) + + file_utils.unpack_zip_archive_to_artifact_folder( + context, af, zip_ref, 'content') + flobj.seek(0) return flobj diff --git a/glare/tests/var/hooks_100.zip b/glare/tests/var/hooks_100.zip new file mode 100644 index 0000000..accfffe Binary files /dev/null and b/glare/tests/var/hooks_100.zip differ