Merge "Optimize zip unpacking"

This commit is contained in:
Jenkins 2017-09-20 16:31:11 +00:00 committed by Gerrit Code Review
commit 2563f34425
7 changed files with 153 additions and 37 deletions

View File

@ -90,6 +90,43 @@ def save_blob_to_store(blob_id, blob, context, max_size,
return location, data.bytes_read, checksums
@utils.error_handler(error_map)
def save_blobs_to_store(blobs, context, max_size,
store_type=None, verifier=None):
"""Save several files to specified store.
:param store_type: type of the store, None means save to default store.
:param blobs: list of tuples (blob_data_id, data)
:param context: user context
:param verifier:signature verified
:return: dict {blob_data_id: (location_uri, size, checksums)}
"""
# wrap data in CooperativeReader
blobs = [(blob_data_id,
utils.LimitingReader(utils.CooperativeReader(data), max_size))
for (blob_data_id, data) in blobs]
if store_type == 'database':
locations = database_api.add_to_backend_batch(blobs, context, verifier)
else:
locations = []
for blob_data_id, data in blobs:
(location, __, __, __) = backend.add_to_backend(
CONF, blob_data_id, data, 0, store_type, context, verifier)
locations.append(location)
# combine location, size and checksums together
res = {}
for i in range(len(locations)):
data = blobs[i][1]
checksums = {"md5": data.md5.hexdigest(),
"sha1": data.sha1.hexdigest(),
"sha256": data.sha256.hexdigest()}
res[blobs[i][0]] = (locations[i], data.bytes_read, checksums)
return res
@utils.error_handler(error_map)
def load_from_store(uri, context):
"""Load file from store backend.

View File

@ -706,6 +706,27 @@ def save_blob_data(context, blob_data_id, data, session):
return "sql://" + blob_data.id
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
stop_max_attempt_number=50)
def save_blob_data_batch(context, blobs, session):
"""Perform batch uploading to database."""
with session.begin():
locations = []
# blobs is a list of tuples (blob_data_id, data)
for blob_data_id, data in blobs:
blob_data = models.ArtifactBlobData()
blob_data.id = blob_data_id
blob_data.data = data.read()
session.add(blob_data)
locations.append("sql://" + blob_data.id)
session.flush()
return locations
@retry(retry_on_exception=_retry_on_deadlock, wait_fixed=500,
stop_max_attempt_number=50)
def get_blob_data(context, uri, session):

View File

@ -15,7 +15,6 @@
"""Contains additional file utils that may be useful for upload hooks."""
import io
import os
import tempfile
import zipfile
@ -25,10 +24,8 @@ from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import uuidutils
from glare.common import exception
from glare.common import store_api
from glare.common import utils
from glare.i18n import _
from glare.objects.meta import fields as glare_fields
CONF = cfg.CONF
@ -65,6 +62,57 @@ def extract_zip_to_temporary_folder(tfile):
return tdir
def unpack_zip_archive_to_artifact_folder(context, af, zip_ref, folder_name):
"""Unpack zip archive to artifact folder.
:param context: user context
:param af: artifact object
:param zip_ref: zip archive to be extracted
:param folder_name: name of the artifact folder where to extract data
"""
file_dict = {}
blobs = []
for name in zip_ref.namelist():
if not name.endswith('/'):
blob_id = uuidutils.generate_uuid()
# create an an empty blob instance in db with 'saving' status
blob = {'url': None, 'size': None, 'md5': None, 'sha1': None,
'sha256': None, 'status': 'saving', 'id': blob_id,
'external': False,
'content_type': 'application/octet-stream'}
file_dict[name] = blob
blobs.append((blob_id, utils.BlobIterator(zip_ref.read(name))))
af = af.update_blob(context, af.id, folder_name, file_dict)
default_store = getattr(
CONF, 'artifact_type:' + af.get_type_name()).default_store
# use global parameter if default store isn't set per artifact type
if default_store is None:
default_store = CONF.glance_store.default_store
# try to perform blob uploading to storage backend
try:
blobs_info = store_api.save_blobs_to_store(
blobs, context, af.get_max_blob_size(folder_name),
default_store)
for name in zip_ref.namelist():
if not name.endswith('/'):
location_uri, size, checksums = blobs_info[
file_dict[name]['id']]
# update blob info and activate it
file_dict[name].update({'url': location_uri,
'status': 'active',
'size': size})
file_dict[name].update(checksums)
except Exception:
# if upload failed remove blob from db and storage
with excutils.save_and_reraise_exception(logger=LOG):
af.update_blob(context, af.id, folder_name, None)
af.update_blob(context, af.id, folder_name, file_dict)
def upload_content_file(context, af, data, blob_dict, key_name,
content_type='application/octet-stream'):
"""Upload a file to a blob dictionary.
@ -109,31 +157,3 @@ def upload_content_file(context, af, data, blob_dict, key_name,
blob.update(checksums)
getattr(af, blob_dict)[key_name] = blob
af.update_blob(context, af.id, blob_dict, getattr(af, blob_dict))
def unpack_zip_archive_in_memory(context, af, field_name, fd):
"""Unpack zip archive in memory and write its content to artifact folder.
:param context: user context
:param af: artifact object
:param field_name: blob dict name where to unpack the data
:param fd: zip archive
:return: io.BytesIO object - simple stream of in-memory bytes
"""
flobj = io.BytesIO(fd.read(INMEMORY_OBJECT_SIZE_LIMIT))
# Raise exception if something left
data = fd.read(1)
if data:
msg = _("The zip you are trying to unpack is too big. "
"The system upper limit is %s") % INMEMORY_OBJECT_SIZE_LIMIT
raise exception.RequestEntityTooLarge(msg)
zip_ref = zipfile.ZipFile(flobj, 'r')
for name in zip_ref.namelist():
if not name.endswith('/'):
upload_content_file(
context, af, utils.BlobIterator(zip_ref.read(name)),
field_name, name)
flobj.seek(0)
return flobj

View File

@ -24,6 +24,10 @@ class DatabaseStoreAPI(base_api.BaseStoreAPI):
session = db_api.get_session()
return db_api.save_blob_data(context, blob_id, data, session)
def add_to_backend_batch(self, blobs, context, verifier=None):
session = db_api.get_session()
return db_api.save_blob_data_batch(context, blobs, session)
def get_from_store(self, uri, context):
session = db_api.get_session()
return db_api.get_blob_data(context, uri, session)

View File

@ -13,6 +13,7 @@
# limitations under the License.
import os
from time import time
from glare.tests.unit import base
@ -42,3 +43,38 @@ class TestArtifactHooks(base.BaseTestArtifactAPI):
self.assertEqual(11, artifact['content']['folder1/bbb.txt']['size'])
self.assertEqual(
11, artifact['content']['folder1/folder2/ccc.txt']['size'])
def test_unpacking_database(self):
self.config(default_store='database',
group='artifact_type:unpacking_artifact')
self.test_unpacking()
def test_unpacking_big_archive(self):
var_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
'../', 'var'))
data_path = os.path.join(var_dir, 'hooks_100.zip')
# play rally - test that this test should pass faster than 3 seconds
start = time()
with open(data_path, "rb") as data:
self.controller.upload_blob(
self.req, 'unpacking_artifact', self.unpacking_artifact['id'],
'zip', data, 'application/octet-stream')
end = time()
self.assertIs(True, (end - start) < 3, (end - start))
artifact = self.controller.show(self.req, 'unpacking_artifact',
self.unpacking_artifact['id'])
self.assertEqual(15702, artifact['zip']['size'])
self.assertEqual('active', artifact['zip']['status'])
self.assertEqual(100, len(artifact['content']))
for blob in artifact['content'].values():
self.assertEqual('active', blob['status'])
self.assertEqual(15, blob['size'])
def test_unpacking_database_big_archive(self):
self.config(default_store='database',
group='artifact_type:unpacking_artifact')
self.test_unpacking_big_archive()

View File

@ -16,7 +16,6 @@ import io
import zipfile
from glare.common import exception
from glare.common import utils
from glare.objects import base
from glare.objects.meta import file_utils
from glare.objects.meta import wrappers
@ -49,10 +48,9 @@ class Unpacker(base.BaseArtifact):
raise exception.RequestEntityTooLarge(msg)
zip_ref = zipfile.ZipFile(flobj, 'r')
for name in zip_ref.namelist():
if not name.endswith('/'):
file_utils.upload_content_file(
context, af, utils.BlobIterator(zip_ref.read(name)),
'content', name)
file_utils.unpack_zip_archive_to_artifact_folder(
context, af, zip_ref, 'content')
flobj.seek(0)
return flobj

Binary file not shown.