Streaming of all reports archive implemented

For providing all reports at once we generate, archive them
and transmit as single file.
Generation and archiving of all reports can take long time
and cause http server connection timeout error.
For preventing such errors used lazy reports generation
and on-the-fly data streming without saving they into files.

- tar archive used instead zip
- streaming of tar archive implemented

Change-Id: I8a71c462cf6ad61ef5b5798770d140866d36edb1
Closes-Bug: #1487356
This commit is contained in:
Alexander Kislitsky 2015-08-21 16:23:42 +03:00 committed by Alexander Kislitsky
parent 2419e947b6
commit 5c08d62ec4
2 changed files with 110 additions and 126 deletions

View File

@ -14,17 +14,16 @@
from datetime import datetime
from datetime import timedelta
import io
import six
import tarfile
from flask import Blueprint
from flask import request
from flask import Response
from flask import send_file
import os
import shutil
from sqlalchemy import distinct
from sqlalchemy import or_
from sqlalchemy import sql
import tempfile
import zipfile
from fuel_analytics.api.app import app
from fuel_analytics.api.app import db
@ -99,10 +98,8 @@ def get_inst_structures():
to_date=to_date).yield_per(yield_per)
def get_action_logs_query(from_date, to_date):
def get_action_logs_query():
"""Selecting only last network verification task for master node cluster
:param from_date: filter from creation or modification date
:param to_date: filter to creation or modification date
:return: SQLAlchemy query
"""
query = "SELECT DISTINCT ON (master_node_uid, body->>'cluster_id') " \
@ -119,14 +116,15 @@ def get_action_logs_query(from_date, to_date):
"AND to_timestamp(body->>'end_timestamp', 'YYYY-MM-DD')::" \
"TIMESTAMP WITHOUT TIME ZONE <= :to_date " \
"ORDER BY master_node_uid, body->>'cluster_id', external_id DESC"
return db.session.execute(
sql.text(query), {'from_date': from_date, 'to_date': to_date})
return sql.text(query)
def get_action_logs():
from_date = get_from_date()
to_date = get_to_date()
return get_action_logs_query(from_date, to_date)
query = get_action_logs_query()
return db.session.execute(query, {'from_date': from_date,
'to_date': to_date})
@bp.route('/clusters', methods=['GET'])
@ -229,78 +227,90 @@ def get_resources_types():
return (row[0] for row in result)
def save_all_reports(tmp_dir):
"""Saves all available CSV reports into single directory
:param tmp_dir: path to target directory
def get_all_reports(from_date, to_date):
"""Returns generator on all reports info.
:param from_date: get reports from date
:param to_date: get reports to date
:return: generator on sequence of tuples (report data, report name)
"""
app.logger.debug("Saving all reports to %s", tmp_dir)
app.logger.debug("Getting all reports")
stats_exporter = StatsToCsv()
oswl_exporter = OswlStatsToCsv()
resources_types = get_resources_types()
inst_strucutres = get_inst_structures()
with open(os.path.join(tmp_dir, CLUSTERS_REPORT_FILE), mode='w') as f:
app.logger.debug("Getting installation structures started")
action_logs = get_action_logs()
clusters = stats_exporter.export_clusters(inst_strucutres,
action_logs)
f.writelines(clusters)
app.logger.debug("Getting installation structures finished")
with open(os.path.join(tmp_dir, PLUGINS_REPORT_FILE), mode='w') as f:
app.logger.debug("Getting plugins started")
plugins = stats_exporter.export_plugins(inst_strucutres)
f.writelines(plugins)
app.logger.debug("Getting plugins finished")
# OSWLs reports
for resource_type in resources_types:
app.logger.debug("Getting resource '%s' started", resource_type)
app.logger.debug("Getting report '%s'", resource_type)
oswls = get_oswls_query(resource_type, from_date=from_date,
to_date=to_date)
report = oswl_exporter.export(resource_type, oswls, to_date)
app.logger.debug("Report '%s' got", resource_type)
yield report, '{}.csv'.format(resource_type)
file_name = os.path.join(tmp_dir, '{}.csv'.format(resource_type))
oswls = get_oswls(resource_type)
with open(file_name, mode='w') as f:
resources = oswl_exporter.export(
resource_type, oswls, get_to_date())
f.writelines(resources)
app.logger.debug("Getting resource '%s' finished", resource_type)
app.logger.debug("All reports saved into %s", tmp_dir)
# Clusters report
app.logger.debug("Getting clusters report")
inst_strucutres = get_inst_structures_query(from_date=from_date,
to_date=to_date)
query_action_logs = get_action_logs_query()
action_logs = db.session.execute(query_action_logs,
{'from_date': from_date,
'to_date': to_date})
clusters = stats_exporter.export_clusters(inst_strucutres,
action_logs)
app.logger.debug("Clusters report got")
yield clusters, CLUSTERS_REPORT_FILE
# Plugins report
app.logger.debug("Getting plugins report")
plugins = stats_exporter.export_plugins(inst_strucutres)
app.logger.debug("Plugins report got")
yield plugins, PLUGINS_REPORT_FILE
app.logger.debug("All reports got")
def archive_dir(dir_path):
"""Archives directory to zip file
:param dir_path: path to target directory
:return: ZipFile object
def stream_reports_tar(reports):
"""Streams reports data as tar archive.
:param reports: generator of collection of tuples
(report data, report name)
:return: streamed reports tar archive
"""
app.logger.debug("Dir '%s' archiving started", dir_path)
tmp_file = tempfile.NamedTemporaryFile(delete=False)
with zipfile.ZipFile(tmp_file, 'w', zipfile.ZIP_STORED) as archive:
for root, dirs, files in os.walk(dir_path):
for f in files:
archive.write(os.path.join(root, f), arcname=f)
app.logger.debug("Dir '%s' archiving to '%s' finished",
dir_path, archive.filename)
return archive
app.logger.debug("Streaming reports as tar archive started")
tar_stream = six.StringIO()
with tarfile.open(None, mode='w', fileobj=tar_stream) as f:
for report, report_name in reports:
app.logger.debug("Streaming report %s", report_name)
stream = six.StringIO()
info = tarfile.TarInfo(report_name)
for row in report:
stream.write(row)
info.size = stream.tell()
stream.seek(io.SEEK_SET)
f.addfile(info, stream)
tar_stream.seek(io.SEEK_SET)
yield tar_stream.getvalue()
tar_stream.seek(io.SEEK_SET)
tar_stream.truncate()
app.logger.debug("Streaming reports as tar archive finished")
@bp.route('/all', methods=['GET'])
def all_reports():
"""Single report for all resource types and clusters info
:return: zip archive of CSV reports
"""Single report for all resource types, clusters and plugins info
:return: tar archive of CSV reports
"""
app.logger.debug("Handling all_reports get request")
tmp_dir = tempfile.mkdtemp()
try:
save_all_reports(tmp_dir)
try:
archive = archive_dir(tmp_dir)
name = 'reports_from{}_to{}.zip'.format(
get_from_date(), get_to_date())
return send_file(archive.filename, mimetype='application/zip',
as_attachment=True, attachment_filename=name)
finally:
app.logger.debug("Removing temporary archive")
os.unlink(archive.filename)
finally:
app.logger.debug("Removing temporary directory %s", tmp_dir)
shutil.rmtree(tmp_dir, ignore_errors=True)
app.logger.debug("Request all_reports handled")
from_date = get_from_date()
to_date = get_to_date()
reports = get_all_reports(from_date, to_date)
name = 'reports_from{}_to{}'.format(get_from_date(), get_to_date())
headers = {
'Content-Disposition': 'attachment; filename={}.tar'.format(name)
}
return Response(stream_reports_tar(reports),
mimetype='application/x-tar', headers=headers)

View File

@ -16,14 +16,8 @@
from datetime import datetime
from datetime import timedelta
import flask
from flask import request
import itertools
import mock
import os
import shutil
import tempfile
import zipfile
from fuel_analytics.test.api.resources.utils.oswl_test import OswlTest
from fuel_analytics.test.base import DbTest
@ -33,16 +27,15 @@ from fuel_analytics.api.app import db
from fuel_analytics.api.common import consts
from fuel_analytics.api.db.model import ActionLog
from fuel_analytics.api.errors import DateExtractionError
from fuel_analytics.api.resources.csv_exporter import archive_dir
from fuel_analytics.api.resources import csv_exporter as ce
from fuel_analytics.api.resources.csv_exporter import extract_date
from fuel_analytics.api.resources.csv_exporter import get_action_logs_query
from fuel_analytics.api.resources.csv_exporter import get_action_logs
from fuel_analytics.api.resources.csv_exporter import get_from_date
from fuel_analytics.api.resources.csv_exporter import get_inst_structures
from fuel_analytics.api.resources.csv_exporter import get_inst_structures_query
from fuel_analytics.api.resources.csv_exporter import get_oswls_query
from fuel_analytics.api.resources.csv_exporter import get_resources_types
from fuel_analytics.api.resources.csv_exporter import get_to_date
from fuel_analytics.api.resources.csv_exporter import save_all_reports
from fuel_analytics.api.resources.utils.stats_to_csv import ActionLogInfo
from fuel_analytics.api.resources.utils.stats_to_csv import StatsToCsv
@ -198,65 +191,43 @@ class CsvExporterTest(OswlTest, DbTest):
resources_names = get_resources_types()
self.assertItemsEqual(self.RESOURCE_TYPES, resources_names)
def test_save_all_reports(self):
def test_get_all_reports(self):
oswls = []
for resource_type in self.RESOURCE_TYPES:
oswls.extend(self.get_saved_oswls(10, resource_type))
self.get_saved_inst_structs(oswls)
tmp_dir = tempfile.mkdtemp()
try:
with app.test_request_context():
save_all_reports(tmp_dir)
files = itertools.chain(('clusters', ), self.RESOURCE_TYPES)
for f in files:
path = os.path.join(tmp_dir, '{}.csv'.format(f))
self.assertTrue(os.path.isfile(path), path)
finally:
shutil.rmtree(tmp_dir)
def test_save_all_reports_with_future_dates(self):
to_date = datetime.utcnow()
from_date = to_date - timedelta(days=30)
reports = ce.get_all_reports(from_date, to_date)
expected_reports = [
ce.CLUSTERS_REPORT_FILE,
ce.PLUGINS_REPORT_FILE
]
for resource_type in self.RESOURCE_TYPES:
expected_reports.append('{}.csv'.format(resource_type))
actual_reports = [name for _, name in reports]
self.assertItemsEqual(expected_reports, actual_reports)
def test_get_all_reports_with_future_dates(self):
oswls = []
for resource_type in self.RESOURCE_TYPES:
oswls.extend(self.get_saved_oswls(10, resource_type))
self.get_saved_inst_structs(oswls)
tmp_dir = tempfile.mkdtemp()
try:
to_date = datetime.utcnow() + timedelta(days=7)
to_data_str = to_date.strftime('%Y-%m-%d')
with app.test_request_context(), mock.patch.object(
flask.request, 'args', {'to_date': to_data_str}):
save_all_reports(tmp_dir)
from_date = datetime.utcnow()
to_date = from_date + timedelta(days=7)
files = itertools.chain(('clusters', ), self.RESOURCE_TYPES)
for f in files:
path = os.path.join(tmp_dir, '{}.csv'.format(f))
self.assertTrue(os.path.isfile(path), path)
finally:
shutil.rmtree(tmp_dir)
reports_generators = ce.get_all_reports(from_date, to_date)
def test_archive_dir(self):
oswls = []
for resource_type in self.RESOURCE_TYPES:
oswls.extend(self.get_saved_oswls(10, resource_type))
self.get_saved_inst_structs(oswls)
tmp_dir = tempfile.mkdtemp()
try:
with app.test_request_context():
save_all_reports(tmp_dir)
files = itertools.chain(('clusters', ), self.RESOURCE_TYPES)
for f in files:
path = os.path.join(tmp_dir, '{}.csv'.format(f))
self.assertTrue(os.path.isfile(path), path)
archive = archive_dir(tmp_dir)
try:
self.assertTrue(zipfile.is_zipfile(archive.filename))
finally:
os.unlink(archive.filename)
finally:
shutil.rmtree(tmp_dir)
# Checking no exception raised
for report_generator, report_name in reports_generators:
for _ in report_generator:
pass
def test_get_action_logs_query(self):
def test_get_action_logs(self):
action_name = StatsToCsv.NETWORK_VERIFICATION_ACTION
action_logs = [
ActionLog(
@ -317,8 +288,11 @@ class CsvExporterTest(OswlTest, DbTest):
for action_log in action_logs:
db.session.add(action_log)
db.session.commit()
to_date = from_date = datetime.utcnow().date()
action_logs = list(get_action_logs_query(from_date, to_date))
to_date = from_date = datetime.utcnow().date().strftime('%Y-%m-%d')
with app.test_request_context():
with mock.patch.object(request, 'args', {'from_date': from_date,
'to_date': to_date}):
action_logs = list(get_action_logs())
# Checking no old and no_end_ts action logs
for action_log in action_logs: